Skip to content

Commit

Permalink
Trie recovery (#5861)
Browse files Browse the repository at this point in the history
* Try recover from TrieException when missing key

* Add multiple peers

* simplify peer allocations

increase timeout

* Add logs and usings

* break db on queue empty

* different test

* more logs

* test

* test

* test

* commented out eth67&68

* more info

* Support for HealingPatriciaTree

* Revert "Support for HealingPatriciaTree"

This reverts commit 1c6e52b.

* Add support for snap sync recovery

* Add IsMainProcessingThread guards

* Initialize healing when pruning only. Mark main processor.

* revert `tests`

* refactor

* refactor

* add artificial throw

* debug state root issue

* fix compatibility

* Revert "debug state root issue"

This reverts commit 37f46cf.

* fix state root access in PersistentStorageProvider

* Disable HealingTrieStore

* enable snap protocol all time

* fix exception handling when querying peer

* name with generics

* simplify testing

* more logs

* more logs

* whitespace fix

* add length check to snap rlp recovery

* try test different;y

* change for tests

* one more test

* more logs

* fix

* Better retry

* more tries

* remove peer head check

* More logs

* test more

* more test

* more test

* more test

* Fix

* Refactor logging

* Improve logging more

* better logging

* fix formatting

* refactor initialization

* Add HealingTrieTests

* Add HealingTrieStoreTests

* Add RecoveryTests

* Remove TESTING code

* Some more changes

* improvement

* whitespace

* fix test

* fix test names

* Validate RLP even when doing snap sync

* simplified refactor

* Move to ArrayPoolList<Recovery>

* always run healing even in archive

* Ingore Netherminds in CanGetSnapData until snap server is implemented
  • Loading branch information
LukaszRozmej authored Jul 28, 2023
1 parent 7549b9e commit a099f0f
Show file tree
Hide file tree
Showing 47 changed files with 1,177 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ private IAuRaValidator CreateAuRaValidator(IBlockProcessor processor, IReadOnlyT
_api.EngineSigner,
_api.SpecProvider,
_api.GasPriceOracle,
_api.ReportingContractValidatorCache, chainSpecAuRa.PosdaoTransition, false)
_api.ReportingContractValidatorCache,
chainSpecAuRa.PosdaoTransition)
.CreateValidatorProcessor(chainSpecAuRa.Validators, _api.BlockTree.Head?.Header);

if (validator is IDisposable disposableValidator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class BlockchainProcessor : IBlockchainProcessor, IBlockProcessingQueue
public int SoftMaxRecoveryQueueSizeInTx = 10000; // adjust based on tx or gas
public const int MaxProcessingQueueSize = 2000; // adjust based on tx or gas

[ThreadStatic]
private static bool _isMainProcessingThread;
public static bool IsMainProcessingThread => _isMainProcessingThread;
public bool IsMainProcessor { get; init; }

public ITracerBag Tracers => _compositeBlockTracer;

private readonly IBlockProcessor _blockProcessor;
Expand Down Expand Up @@ -56,7 +61,7 @@ public class BlockchainProcessor : IBlockchainProcessor, IBlockProcessingQueue
private readonly CompositeBlockTracer _compositeBlockTracer = new();
private readonly Stopwatch _stopwatch = new();

public event EventHandler<IBlockchainProcessor.InvalidBlockEventArgs> InvalidBlock;
public event EventHandler<IBlockchainProcessor.InvalidBlockEventArgs>? InvalidBlock;

/// <summary>
///
Expand Down Expand Up @@ -256,6 +261,8 @@ private Task RunProcessing()

Thread thread = new(() =>
{
_isMainProcessingThread = IsMainProcessor;
try
{
RunProcessingLoop();
Expand Down
12 changes: 12 additions & 0 deletions src/Nethermind/Nethermind.Core/Extensions/TypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,17 @@ public static bool CanBeAssignedNull(this Type type) =>

public static bool CannotBeAssignedNull(this Type type) =>
type.IsValueType && Nullable.GetUnderlyingType(type) is null;

/// <summary>
/// Returns the type name. If this is a generic type, appends
/// the list of generic type arguments between angle brackets.
/// (Does not account for embedded / inner generic arguments.)
/// </summary>
/// <param name="type">The type.</param>
/// <returns>System.String.</returns>
public static string NameWithGenerics(this Type type) =>
type.IsGenericType
? $"{type.Name.Substring(0, type.Name.IndexOf("`", StringComparison.InvariantCultureIgnoreCase))}<{string.Join(",", type.GetGenericArguments().Select(NameWithGenerics))}>"
: type.Name;
}
}
6 changes: 3 additions & 3 deletions src/Nethermind/Nethermind.Core/IKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ public interface IKeyValueStore : IReadOnlyKeyValueStore
{
new byte[]? this[ReadOnlySpan<byte> key]
{
get => Get(key, ReadFlags.None);
set => Set(key, value, WriteFlags.None);
get => Get(key);
set => Set(key, value);
}

void Set(ReadOnlySpan<byte> key, byte[]? value, WriteFlags flags = WriteFlags.None);
}

public interface IReadOnlyKeyValueStore
{
byte[]? this[ReadOnlySpan<byte> key] => Get(key, ReadFlags.None);
byte[]? this[ReadOnlySpan<byte> key] => Get(key);

byte[]? Get(ReadOnlySpan<byte> key, ReadFlags flags = ReadFlags.None);
}
Expand Down
61 changes: 33 additions & 28 deletions src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
using Nethermind.State;
using Nethermind.State.Witnesses;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Trie;
using Nethermind.Synchronization.Witness;
using Nethermind.Trie;
using Nethermind.Trie.Pruning;
Expand Down Expand Up @@ -105,41 +106,47 @@ private Task InitBlockchain()
IKeyValueStore codeDb = getApi.DbProvider.CodeDb
.WitnessedBy(witnessCollector);

TrieStore trieStore;
IKeyValueStoreWithBatching stateWitnessedBy = setApi.MainStateDbWithCache.WitnessedBy(witnessCollector);
IPersistenceStrategy persistenceStrategy;
IPruningStrategy pruningStrategy;
if (pruningConfig.Mode.IsMemory())
{
IPersistenceStrategy persistenceStrategy = Persist.IfBlockOlderThan(pruningConfig.PersistenceInterval); // TODO: this should be based on time
persistenceStrategy = Persist.IfBlockOlderThan(pruningConfig.PersistenceInterval); // TODO: this should be based on time
if (pruningConfig.Mode.IsFull())
{
PruningTriggerPersistenceStrategy triggerPersistenceStrategy = new((IFullPruningDb)getApi.DbProvider!.StateDb, getApi.BlockTree!, getApi.LogManager);
getApi.DisposeStack.Push(triggerPersistenceStrategy);
persistenceStrategy = persistenceStrategy.Or(triggerPersistenceStrategy);
}

setApi.TrieStore = trieStore = new TrieStore(
stateWitnessedBy,
Prune.WhenCacheReaches(pruningConfig.CacheMb.MB()), // TODO: memory hint should define this
persistenceStrategy,
getApi.LogManager);

if (pruningConfig.Mode.IsFull())
{
IFullPruningDb fullPruningDb = (IFullPruningDb)getApi.DbProvider!.StateDb;
fullPruningDb.PruningStarted += (_, args) =>
{
cachedStateDb.PersistCache(args.Context);
trieStore.PersistCache(args.Context, args.Context.CancellationTokenSource.Token);
};
}
pruningStrategy = Prune.WhenCacheReaches(pruningConfig.CacheMb.MB()); // TODO: memory hint should define this
}
else
{
setApi.TrieStore = trieStore = new TrieStore(
stateWitnessedBy,
No.Pruning,
Persist.EveryBlock,
getApi.LogManager);
pruningStrategy = No.Pruning;
persistenceStrategy = Persist.EveryBlock;
}

TrieStore trieStore = new HealingTrieStore(
stateWitnessedBy,
pruningStrategy,
persistenceStrategy,
getApi.LogManager);
setApi.TrieStore = trieStore;

IWorldState worldState = setApi.WorldState = new HealingWorldState(
trieStore,
codeDb,
getApi.LogManager);

if (pruningConfig.Mode.IsFull())
{
IFullPruningDb fullPruningDb = (IFullPruningDb)getApi.DbProvider!.StateDb;
fullPruningDb.PruningStarted += (_, args) =>
{
cachedStateDb.PersistCache(args.Context);
trieStore.PersistCache(args.Context, args.Context.CancellationTokenSource.Token);
};
}

TrieStoreBoundaryWatcher trieStoreBoundaryWatcher = new(trieStore, _api.BlockTree!, _api.LogManager);
Expand All @@ -148,11 +155,6 @@ private Task InitBlockchain()

ITrieStore readOnlyTrieStore = setApi.ReadOnlyTrieStore = trieStore.AsReadOnly(cachedStateDb);

IWorldState worldState = setApi.WorldState = new WorldState(
trieStore,
codeDb,
getApi.LogManager);

ReadOnlyDbProvider readOnly = new(getApi.DbProvider, false);

IStateReader stateReader = setApi.StateReader = new StateReader(readOnlyTrieStore, readOnly.GetDb<IDb>(DbNames.Code), getApi.LogManager);
Expand Down Expand Up @@ -259,7 +261,10 @@ private Task InitBlockchain()
{
StoreReceiptsByDefault = initConfig.StoreReceipts,
DumpOptions = initConfig.AutoDump
});
})
{
IsMainProcessor = true
};

setApi.BlockProcessingQueue = blockchainProcessor;
setApi.BlockchainProcessor = blockchainProcessor;
Expand Down
34 changes: 24 additions & 10 deletions src/Nethermind/Nethermind.Init/Steps/InitializeNetwork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
using Nethermind.Synchronization.Peers;
using Nethermind.Synchronization.Reporting;
using Nethermind.Synchronization.SnapSync;
using Nethermind.Synchronization.StateSync;
using Nethermind.Synchronization.Trie;

namespace Nethermind.Init.Steps;

Expand Down Expand Up @@ -87,6 +89,7 @@ public async Task Execute(CancellationToken cancellationToken)
private async Task Initialize(CancellationToken cancellationToken)
{
if (_api.DbProvider is null) throw new StepDependencyException(nameof(_api.DbProvider));
if (_api.BlockTree is null) throw new StepDependencyException(nameof(_api.BlockTree));

if (_networkConfig.DiagTracerEnabled)
{
Expand All @@ -100,11 +103,11 @@ private async Task Initialize(CancellationToken cancellationToken)

CanonicalHashTrie cht = new CanonicalHashTrie(_api.DbProvider!.ChtDb);

ProgressTracker progressTracker = new(_api.BlockTree!, _api.DbProvider.StateDb, _api.LogManager, _syncConfig.SnapSyncAccountRangePartitionCount);
ProgressTracker progressTracker = new(_api.BlockTree, _api.DbProvider.StateDb, _api.LogManager, _syncConfig.SnapSyncAccountRangePartitionCount);
_api.SnapProvider = new SnapProvider(progressTracker, _api.DbProvider, _api.LogManager);

SyncProgressResolver syncProgressResolver = new(
_api.BlockTree!,
_api.BlockTree,
_api.ReceiptStorage!,
_api.DbProvider.StateDb,
_api.ReadOnlyTrieStore!,
Expand All @@ -118,12 +121,22 @@ private async Task Initialize(CancellationToken cancellationToken)
int maxPeersCount = _networkConfig.ActivePeersMaxCount;
int maxPriorityPeersCount = _networkConfig.PriorityPeersMaxCount;
Network.Metrics.PeerLimit = maxPeersCount;
SyncPeerPool apiSyncPeerPool = new(_api.BlockTree!, _api.NodeStatsManager!, _api.BetterPeerStrategy, _api.LogManager, maxPeersCount, maxPriorityPeersCount);
SyncPeerPool apiSyncPeerPool = new(_api.BlockTree, _api.NodeStatsManager!, _api.BetterPeerStrategy, _api.LogManager, maxPeersCount, maxPriorityPeersCount);

_api.SyncPeerPool = apiSyncPeerPool;
_api.PeerDifficultyRefreshPool = apiSyncPeerPool;
_api.DisposeStack.Push(_api.SyncPeerPool);

if (_api.TrieStore is HealingTrieStore healingTrieStore)
{
healingTrieStore.InitializeNetwork(new GetNodeDataTrieNodeRecovery(apiSyncPeerPool, _api.LogManager));
}

if (_api.WorldState is HealingWorldState healingWorldState)
{
healingWorldState.InitializeNetwork(new SnapTrieNodeRecovery(apiSyncPeerPool, _api.LogManager));
}

IEnumerable<ISynchronizationPlugin> synchronizationPlugins = _api.GetSynchronizationPlugins();
foreach (ISynchronizationPlugin plugin in synchronizationPlugins)
{
Expand All @@ -133,7 +146,7 @@ private async Task Initialize(CancellationToken cancellationToken)
_api.SyncModeSelector ??= CreateMultiSyncModeSelector(syncProgressResolver);
_api.TxGossipPolicy.Policies.Add(new SyncedTxGossipPolicy(_api.SyncModeSelector));

_api.EthSyncingInfo = new EthSyncingInfo(_api.BlockTree!, _api.ReceiptStorage!, _syncConfig, _api.SyncModeSelector, _api.LogManager);
_api.EthSyncingInfo = new EthSyncingInfo(_api.BlockTree, _api.ReceiptStorage!, _syncConfig, _api.SyncModeSelector, _api.LogManager);
_api.DisposeStack.Push(_api.SyncModeSelector);

_api.Pivot ??= new Pivot(_syncConfig);
Expand All @@ -144,7 +157,7 @@ private async Task Initialize(CancellationToken cancellationToken)

_api.BlockDownloaderFactory ??= new BlockDownloaderFactory(
_api.SpecProvider!,
_api.BlockTree!,
_api.BlockTree,
_api.ReceiptStorage!,
_api.BlockValidator!,
_api.SealValidator!,
Expand All @@ -155,7 +168,7 @@ private async Task Initialize(CancellationToken cancellationToken)
_api.Synchronizer ??= new Synchronizer(
_api.DbProvider,
_api.SpecProvider!,
_api.BlockTree!,
_api.BlockTree,
_api.ReceiptStorage!,
_api.SyncPeerPool,
_api.NodeStatsManager!,
Expand All @@ -171,9 +184,9 @@ private async Task Initialize(CancellationToken cancellationToken)
_api.DisposeStack.Push(_api.Synchronizer);

ISyncServer syncServer = _api.SyncServer = new SyncServer(
_api.TrieStore!,
_api.TrieStore!.AsKeyValueStore(),
_api.DbProvider.CodeDb,
_api.BlockTree!,
_api.BlockTree,
_api.ReceiptStorage!,
_api.BlockValidator!,
_api.SealValidator!,
Expand Down Expand Up @@ -214,8 +227,9 @@ await InitPeer().ContinueWith(initPeerTask =>
}
else if (_logger.IsDebug) _logger.Debug("Skipped enabling eth67 & eth68 capabilities");

if (_syncConfig.SnapSync && !stateSyncFinished)
if (_syncConfig.SnapSync)
{
// TODO: Should we keep snap capability even after finishing sync?
SnapCapabilitySwitcher snapCapabilitySwitcher = new(_api.ProtocolsManager, _api.SyncModeSelector, _api.LogManager);
snapCapabilitySwitcher.EnableSnapCapabilityUntilSynced();
}
Expand Down Expand Up @@ -513,7 +527,7 @@ private async Task InitPeer()
ISyncServer syncServer = _api.SyncServer!;
ForkInfo forkInfo = new(_api.SpecProvider!, syncServer.Genesis.Hash!);

ProtocolValidator protocolValidator = new(_api.NodeStatsManager!, _api.BlockTree!, forkInfo, _api.LogManager);
ProtocolValidator protocolValidator = new(_api.NodeStatsManager!, _api.BlockTree, forkInfo, _api.LogManager);
PooledTxsRequestor pooledTxsRequestor = new(_api.TxPool!);
_api.ProtocolsManager = new ProtocolsManager(
_api.SyncPeerPool!,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Logging;
using Nethermind.Network.Contract.P2P;
using Nethermind.Network.P2P.EventArg;
Expand All @@ -17,6 +18,7 @@
using Nethermind.State.Snap;
using Nethermind.Stats;
using Nethermind.Stats.Model;
using Nethermind.Trie;

namespace Nethermind.Network.P2P.Subprotocols.Snap
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public SnapCapabilitySwitcher(IProtocolsManager? protocolsManager, ISyncModeSele
{
_protocolsManager = protocolsManager ?? throw new ArgumentNullException(nameof(protocolsManager));
_syncModeSelector = syncModeSelector ?? throw new ArgumentNullException(nameof(syncModeSelector));
_logger = logManager.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
_logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
}

/// <summary>
Expand Down
14 changes: 14 additions & 0 deletions src/Nethermind/Nethermind.State/IStorageTreeFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Logging;
using Nethermind.Trie.Pruning;

namespace Nethermind.State;

public interface IStorageTreeFactory
{
StorageTree Create(Address address, ITrieStore trieStore, Keccak storageRoot, Keccak stateRoot, ILogManager? logManager);
}
16 changes: 14 additions & 2 deletions src/Nethermind/Nethermind.State/PersistentStorageProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,27 @@ internal class PersistentStorageProvider : PartialStorageProviderBase
private readonly ITrieStore _trieStore;
private readonly StateProvider _stateProvider;
private readonly ILogManager? _logManager;
internal readonly IStorageTreeFactory _storageTreeFactory;
private readonly ResettableDictionary<Address, StorageTree> _storages = new();

/// <summary>
/// EIP-1283
/// </summary>
private readonly ResettableDictionary<StorageCell, byte[]> _originalValues = new();

private readonly ResettableHashSet<StorageCell> _committedThisRound = new();

public PersistentStorageProvider(ITrieStore? trieStore, StateProvider? stateProvider, ILogManager? logManager)
public PersistentStorageProvider(ITrieStore? trieStore, StateProvider? stateProvider, ILogManager? logManager, IStorageTreeFactory? storageTreeFactory = null)
: base(logManager)
{
_trieStore = trieStore ?? throw new ArgumentNullException(nameof(trieStore));
_stateProvider = stateProvider ?? throw new ArgumentNullException(nameof(stateProvider));
_logManager = logManager ?? throw new ArgumentNullException(nameof(logManager));
_storageTreeFactory = storageTreeFactory ?? new StorageTreeFactory();
}

public Keccak StateRoot { get; set; } = null!;

/// <summary>
/// Reset the storage state
/// </summary>
Expand Down Expand Up @@ -220,7 +226,7 @@ private StorageTree GetOrCreateStorage(Address address)
{
if (!_storages.ContainsKey(address))
{
StorageTree storageTree = new(_trieStore, _stateProvider.GetStorageRoot(address), _logManager);
StorageTree storageTree = _storageTreeFactory.Create(address, _trieStore, _stateProvider.GetStorageRoot(address), StateRoot, _logManager);
return _storages[address] = storageTree;
}

Expand Down Expand Up @@ -281,5 +287,11 @@ public override void ClearStorage(Address address)
// TODO: how does it work with pruning?
_storages[address] = new StorageTree(_trieStore, Keccak.EmptyTreeHash, _logManager);
}

private class StorageTreeFactory : IStorageTreeFactory
{
public StorageTree Create(Address address, ITrieStore trieStore, Keccak storageRoot, Keccak stateRoot, ILogManager? logManager)
=> new(trieStore, storageRoot, logManager);
}
}
}
Loading

0 comments on commit a099f0f

Please sign in to comment.