diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index 831429a02f6..cd4ceb1d07f 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -714,6 +714,7 @@ public BesuController build() { final DefaultSynchronizer synchronizer = createSynchronizer( protocolSchedule, + currentProtocolSpecSupplier, worldStateStorageCoordinator, protocolContext, ethContext, @@ -842,6 +843,7 @@ private TrieLogPruner createTrieLogPruner( * Create synchronizer synchronizer. * * @param protocolSchedule the protocol schedule + * @param currentProtocolSpecSupplier the protocol spec supplier * @param worldStateStorageCoordinator the world state storage * @param protocolContext the protocol context * @param ethContext the eth context @@ -853,6 +855,7 @@ private TrieLogPruner createTrieLogPruner( */ protected DefaultSynchronizer createSynchronizer( final ProtocolSchedule protocolSchedule, + final Supplier currentProtocolSpecSupplier, final WorldStateStorageCoordinator worldStateStorageCoordinator, final ProtocolContext protocolContext, final EthContext ethContext, @@ -864,6 +867,7 @@ protected DefaultSynchronizer createSynchronizer( return new DefaultSynchronizer( syncConfig, protocolSchedule, + currentProtocolSpecSupplier, protocolContext, worldStateStorageCoordinator, ethProtocolManager.getBlockBroadcaster(), diff --git a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java index 75abca6a574..85597d3cf03 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java @@ -50,6 +50,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; @@ -65,6 +66,7 @@ import java.util.Map; import java.util.Optional; import java.util.function.Consumer; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -220,6 +222,7 @@ protected PluginServiceFactory createAdditionalPluginServices( @Override protected DefaultSynchronizer createSynchronizer( final ProtocolSchedule protocolSchedule, + final Supplier currentProtocolSpecSupplier, final WorldStateStorageCoordinator worldStateStorageCoordinator, final ProtocolContext protocolContext, final EthContext ethContext, @@ -231,6 +234,7 @@ protected DefaultSynchronizer createSynchronizer( DefaultSynchronizer sync = super.createSynchronizer( protocolSchedule, + currentProtocolSpecSupplier, worldStateStorageCoordinator, protocolContext, ethContext, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTask.java new file mode 100644 index 00000000000..507a98369a6 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTask.java @@ -0,0 +1,139 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask.task; + +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask; +import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage; +import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage; +import org.hyperledger.besu.ethereum.mainnet.BodyValidation; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GetBodiesFromPeerTask implements PeerTask> { + + private static final Logger LOG = LoggerFactory.getLogger(GetBodiesFromPeerTask.class); + + private final List blockHeaders; + private final ProtocolSchedule protocolSchedule; + + private final long requiredBlockchainHeight; + private final List blocks = new ArrayList<>(); + private final boolean isPoS; + + public GetBodiesFromPeerTask( + final List blockHeaders, final ProtocolSchedule protocolSchedule) { + if (blockHeaders == null || blockHeaders.isEmpty()) { + throw new IllegalArgumentException("Block headers must not be empty"); + } + + this.blockHeaders = blockHeaders; + this.protocolSchedule = protocolSchedule; + + this.requiredBlockchainHeight = + blockHeaders.stream() + .mapToLong(BlockHeader::getNumber) + .max() + .orElse(BlockHeader.GENESIS_BLOCK_NUMBER); + this.isPoS = protocolSchedule.getByBlockHeader(blockHeaders.getLast()).isPoS(); + } + + @Override + public SubProtocol getSubProtocol() { + return EthProtocol.get(); + } + + @Override + public MessageData getRequestMessage() { + return GetBlockBodiesMessage.create( + blockHeaders.stream().map(BlockHeader::getBlockHash).toList()); + } + + @Override + public List processResponse(final MessageData messageData) + throws InvalidPeerTaskResponseException { + // Blocks returned by this method are in the same order as the headers, but might not be + // complete + if (messageData == null) { + throw new InvalidPeerTaskResponseException(); + } + final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(messageData); + final List blockBodies = blocksMessage.bodies(protocolSchedule); + if (blockBodies.isEmpty() || blockBodies.size() > blockHeaders.size()) { + throw new InvalidPeerTaskResponseException(); + } + + for (int i = 0; i < blockBodies.size(); i++) { + final BlockBody blockBody = blockBodies.get(i); + final BlockHeader blockHeader = blockHeaders.get(i); + if (!blockBodyMatchesBlockHeader(blockBody, blockHeader)) { + LOG.atDebug().setMessage("Received block body does not match block header").log(); + throw new InvalidPeerTaskResponseException(); + } + + blocks.add(new Block(blockHeader, blockBody)); + } + return blocks; + } + + private boolean blockBodyMatchesBlockHeader( + final BlockBody blockBody, final BlockHeader blockHeader) { + // this method validates that the block body matches the block header by calculating the roots + // of the block body and comparing them to the roots in the block header + if (!BodyValidation.transactionsRoot(blockBody.getTransactions()) + .equals(blockHeader.getTransactionsRoot())) { + return false; + } + if (!BodyValidation.ommersHash(blockBody.getOmmers()).equals(blockHeader.getOmmersHash())) { + return false; + } + if (blockBody.getWithdrawals().isPresent()) { + if (blockHeader.getWithdrawalsRoot().isEmpty()) { + return false; + } + if (!BodyValidation.withdrawalsRoot(blockBody.getWithdrawals().get()) + .equals(blockHeader.getWithdrawalsRoot().get())) { + return false; + } + } else if (blockHeader.getWithdrawalsRoot().isPresent()) { + return false; + } + return true; + } + + @Override + public Predicate getPeerRequirementFilter() { + return (ethPeer) -> + isPoS || ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight; + } + + @Override + public boolean isSuccess(final List result) { + return !result.isEmpty(); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 66684ab7873..24007505e15 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -35,6 +35,7 @@ import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.BonsaiWorldStateProvider; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; @@ -79,6 +80,7 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi public DefaultSynchronizer( final SynchronizerConfiguration syncConfig, final ProtocolSchedule protocolSchedule, + final Supplier currentProtocolSpecSupplier, final ProtocolContext protocolContext, final WorldStateStorageCoordinator worldStateStorageCoordinator, final BlockBroadcaster blockBroadcaster, @@ -136,6 +138,7 @@ public DefaultSynchronizer( syncState, metricsSystem, terminationCondition, + peerTaskExecutor, syncDurationMetrics)); if (SyncMode.FAST.equals(syncConfig.getSyncMode())) { @@ -146,6 +149,7 @@ public DefaultSynchronizer( syncConfig, dataDirectory, protocolSchedule, + currentProtocolSpecSupplier, protocolContext, metricsSystem, ethContext, @@ -163,6 +167,7 @@ public DefaultSynchronizer( syncConfig, dataDirectory, protocolSchedule, + currentProtocolSpecSupplier, protocolContext, metricsSystem, ethContext, @@ -180,6 +185,7 @@ public DefaultSynchronizer( syncConfig, dataDirectory, protocolSchedule, + currentProtocolSpecSupplier, protocolContext, metricsSystem, ethContext, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java index 26fc7b544ce..3da93d87b62 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java @@ -17,7 +17,9 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksTask; +import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksWithPeerTask; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -31,19 +33,41 @@ public class DownloadBodiesStep private final ProtocolSchedule protocolSchedule; private final EthContext ethContext; private final MetricsSystem metricsSystem; + private final SynchronizerConfiguration synchronizerConfiguration; + private final PeerTaskExecutor peerTaskExecutor; public DownloadBodiesStep( final ProtocolSchedule protocolSchedule, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, + final SynchronizerConfiguration synchronizerConfiguration, final MetricsSystem metricsSystem) { this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; + this.peerTaskExecutor = peerTaskExecutor; + this.synchronizerConfiguration = synchronizerConfiguration; this.metricsSystem = metricsSystem; } @Override public CompletableFuture> apply(final List blockHeaders) { - return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem) - .run(); + if (synchronizerConfiguration.isPeerTaskSystemEnabled()) { + return ethContext + .getScheduler() + .scheduleServiceTask(() -> getBodiesWithPeerTaskSystem(blockHeaders)); + } else { + return CompleteBlocksTask.forHeaders( + protocolSchedule, ethContext, blockHeaders, metricsSystem) + .run(); + } + } + + private CompletableFuture> getBodiesWithPeerTaskSystem( + final List headers) { + + final CompleteBlocksWithPeerTask completeBlocksWithPeerTask = + new CompleteBlocksWithPeerTask(protocolSchedule, headers, peerTaskExecutor); + final List blocks = completeBlocksWithPeerTask.getBlocks(); + return CompletableFuture.completedFuture(blocks); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java index 30134d9f6c5..e8a6fe2804f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java @@ -34,6 +34,7 @@ import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; import org.hyperledger.besu.ethereum.trie.CompactEncoding; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; @@ -44,6 +45,7 @@ import java.nio.file.Path; import java.time.Clock; import java.util.Optional; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +61,7 @@ public static Optional> createCheckpointDownloader( final SynchronizerConfiguration syncConfig, final Path dataDirectory, final ProtocolSchedule protocolSchedule, + final Supplier currentProtocolSpecSupplier, final ProtocolContext protocolContext, final MetricsSystem metricsSystem, final EthContext ethContext, @@ -110,6 +113,7 @@ public static Optional> createCheckpointDownloader( syncConfig, worldStateStorageCoordinator, protocolSchedule, + currentProtocolSpecSupplier, protocolContext, ethContext, peerTaskExecutor, @@ -128,6 +132,7 @@ public static Optional> createCheckpointDownloader( syncConfig, worldStateStorageCoordinator, protocolSchedule, + currentProtocolSpecSupplier, protocolContext, ethContext, peerTaskExecutor, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java index 61b997e6c53..d8bdaaf483f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointSyncActions.java @@ -24,15 +24,19 @@ import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; import org.hyperledger.besu.metrics.SyncDurationMetrics; import org.hyperledger.besu.plugin.services.MetricsSystem; +import java.util.function.Supplier; + public class CheckpointSyncActions extends FastSyncActions { public CheckpointSyncActions( final SynchronizerConfiguration syncConfig, final WorldStateStorageCoordinator worldStateStorageCoordinator, final ProtocolSchedule protocolSchedule, + final Supplier currentProtocolSpecSupplier, final ProtocolContext protocolContext, final EthContext ethContext, final PeerTaskExecutor peerTaskExecutor, @@ -43,6 +47,7 @@ public CheckpointSyncActions( syncConfig, worldStateStorageCoordinator, protocolSchedule, + currentProtocolSpecSupplier, protocolContext, ethContext, peerTaskExecutor, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java index 58a64bd562a..099b4c0d2a8 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActions.java @@ -27,6 +27,7 @@ import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.tasks.RetryingGetHeaderFromPeerByHashTask; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.SyncDurationMetrics; @@ -47,6 +48,7 @@ public class FastSyncActions { protected final SynchronizerConfiguration syncConfig; protected final WorldStateStorageCoordinator worldStateStorageCoordinator; protected final ProtocolSchedule protocolSchedule; + protected final Supplier currentProtocolSpecSupplier; protected final ProtocolContext protocolContext; protected final EthContext ethContext; protected final PeerTaskExecutor peerTaskExecutor; @@ -60,6 +62,7 @@ public FastSyncActions( final SynchronizerConfiguration syncConfig, final WorldStateStorageCoordinator worldStateStorageCoordinator, final ProtocolSchedule protocolSchedule, + final Supplier currentProtocolSpecSupplier, final ProtocolContext protocolContext, final EthContext ethContext, final PeerTaskExecutor peerTaskExecutor, @@ -69,6 +72,7 @@ public FastSyncActions( this.syncConfig = syncConfig; this.worldStateStorageCoordinator = worldStateStorageCoordinator; this.protocolSchedule = protocolSchedule; + this.currentProtocolSpecSupplier = currentProtocolSpecSupplier; this.protocolContext = protocolContext; this.ethContext = ethContext; this.peerTaskExecutor = peerTaskExecutor; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java index 67085252e8f..12ee3e45cd2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java @@ -143,7 +143,8 @@ public Pipeline createDownloadPipelineForSyncTarget(final SyncT final RangeHeadersValidationStep validateHeadersJoinUpStep = new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy); final DownloadBodiesStep downloadBodiesStep = - new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem); + new DownloadBodiesStep( + protocolSchedule, ethContext, peerTaskExecutor, syncConfig, metricsSystem); final DownloadReceiptsStep downloadReceiptsStep = new DownloadReceiptsStep( protocolSchedule, ethContext, peerTaskExecutor, syncConfig, metricsSystem); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index 1d775cc80fd..0b6fad7bb62 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -28,6 +28,7 @@ import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; import org.hyperledger.besu.metrics.BesuMetricCategory; @@ -41,6 +42,7 @@ import java.nio.file.Path; import java.time.Clock; import java.util.Optional; +import java.util.function.Supplier; import java.util.stream.Stream; import org.slf4j.Logger; @@ -57,6 +59,7 @@ public static Optional> create( final SynchronizerConfiguration syncConfig, final Path dataDirectory, final ProtocolSchedule protocolSchedule, + final Supplier currentProtocolSpecSupplier, final ProtocolContext protocolContext, final MetricsSystem metricsSystem, final EthContext ethContext, @@ -126,6 +129,7 @@ public static Optional> create( syncConfig, worldStateStorageCoordinator, protocolSchedule, + currentProtocolSpecSupplier, protocolContext, ethContext, peerTaskExecutor, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java index 3a0f6edb086..7faacedfb2a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java @@ -16,6 +16,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; @@ -35,7 +36,8 @@ public static ChainDownloader create( final SyncState syncState, final MetricsSystem metricsSystem, final SyncTerminationCondition terminationCondition, - final SyncDurationMetrics syncDurationMetrics) { + final SyncDurationMetrics syncDurationMetrics, + final PeerTaskExecutor peerTaskExecutor) { final FullSyncTargetManager syncTargetManager = new FullSyncTargetManager( @@ -54,6 +56,7 @@ public static ChainDownloader create( protocolSchedule, protocolContext, ethContext, + peerTaskExecutor, metricsSystem, terminationCondition), ethContext.getScheduler(), diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java index b822236b8f0..bde94331d52 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.DownloadBodiesStep; import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep; import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory; @@ -53,21 +54,25 @@ public class FullSyncDownloadPipelineFactory implements DownloadPipelineFactory () -> HeaderValidationMode.DETACHED_ONLY; private final BetterSyncTargetEvaluator betterSyncTargetEvaluator; private final SyncTerminationCondition fullSyncTerminationCondition; + private final PeerTaskExecutor peerTaskExecutor; public FullSyncDownloadPipelineFactory( final SynchronizerConfiguration syncConfig, final ProtocolSchedule protocolSchedule, final ProtocolContext protocolContext, final EthContext ethContext, + final PeerTaskExecutor peerTaskExecutor, final MetricsSystem metricsSystem, final SyncTerminationCondition syncTerminationCondition) { this.syncConfig = syncConfig; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; + this.peerTaskExecutor = peerTaskExecutor; this.metricsSystem = metricsSystem; this.fullSyncTerminationCondition = syncTerminationCondition; - betterSyncTargetEvaluator = new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers()); + this.betterSyncTargetEvaluator = + new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers()); } @Override @@ -104,7 +109,8 @@ public Pipeline createDownloadPipelineForSyncTarget(final SyncTarget target) final RangeHeadersValidationStep validateHeadersJoinUpStep = new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy); final DownloadBodiesStep downloadBodiesStep = - new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem); + new DownloadBodiesStep( + protocolSchedule, ethContext, peerTaskExecutor, syncConfig, metricsSystem); final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep(); final FullImportBlockStep importBlockStep = new FullImportBlockStep( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java index 8f1aca792c3..97bf38c95ec 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java @@ -16,6 +16,7 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements; @@ -45,6 +46,7 @@ public FullSyncDownloader( final SyncState syncState, final MetricsSystem metricsSystem, final SyncTerminationCondition terminationCondition, + final PeerTaskExecutor peerTaskExecutor, final SyncDurationMetrics syncDurationMetrics) { this.syncConfig = syncConfig; this.protocolContext = protocolContext; @@ -59,7 +61,8 @@ public FullSyncDownloader( syncState, metricsSystem, terminationCondition, - syncDurationMetrics); + syncDurationMetrics, + peerTaskExecutor); } public CompletableFuture start() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java index 6c5ce0b04e9..423bb6b8790 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java @@ -31,6 +31,7 @@ import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldStateDownloader; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; import org.hyperledger.besu.ethereum.trie.CompactEncoding; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; @@ -41,6 +42,7 @@ import java.nio.file.Path; import java.time.Clock; import java.util.Optional; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +57,7 @@ public static Optional> createSnapDownloader( final SynchronizerConfiguration syncConfig, final Path dataDirectory, final ProtocolSchedule protocolSchedule, + final Supplier currentProtocolSpecSupplier, final ProtocolContext protocolContext, final MetricsSystem metricsSystem, final EthContext ethContext, @@ -121,6 +124,7 @@ public static Optional> createSnapDownloader( syncConfig, worldStateStorageCoordinator, protocolSchedule, + currentProtocolSpecSupplier, protocolContext, ethContext, peerTaskExecutor, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTask.java new file mode 100644 index 00000000000..b38143f1aae --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTask.java @@ -0,0 +1,129 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.tasks; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Given a set of headers, "completes" them by repeatedly requesting additional data (bodies) needed + * to create the blocks that correspond to the supplied headers. + */ +public class CompleteBlocksWithPeerTask { + private static final Logger LOG = LoggerFactory.getLogger(CompleteBlocksWithPeerTask.class); + + private final ProtocolSchedule protocolSchedule; + private final List headersToGet = new ArrayList<>(); + private final PeerTaskExecutor peerTaskExecutor; + + private final Block[] result; + private final int resultSize; + private int nextIndex = 0; + private int remainingBlocks; + + public CompleteBlocksWithPeerTask( + final ProtocolSchedule protocolSchedule, + final List headers, + final PeerTaskExecutor peerTaskExecutor) { + checkArgument(!headers.isEmpty(), "Must supply a non-empty headers list"); + this.protocolSchedule = protocolSchedule; + this.peerTaskExecutor = peerTaskExecutor; + + resultSize = headers.size(); + result = new Block[resultSize]; + remainingBlocks = resultSize; + + for (int i = 0; i < resultSize; i++) { + final BlockHeader header = headers.get(i); + if (BlockHeader.hasEmptyBlock(header)) { + final Block emptyBlock = + new Block(header, createEmptyBodyBasedOnProtocolSchedule(protocolSchedule, header)); + result[i] = emptyBlock; + remainingBlocks--; + } else { + headersToGet.add(header); + } + } + this.nextIndex = findNextIndex(0); + } + + private BlockBody createEmptyBodyBasedOnProtocolSchedule( + final ProtocolSchedule protocolSchedule, final BlockHeader header) { + return new BlockBody( + Collections.emptyList(), + Collections.emptyList(), + isWithdrawalsEnabled(protocolSchedule, header) + ? Optional.of(Collections.emptyList()) + : Optional.empty()); + } + + private boolean isWithdrawalsEnabled( + final ProtocolSchedule protocolSchedule, final BlockHeader header) { + return protocolSchedule.getByBlockHeader(header).getWithdrawalsProcessor().isPresent(); + } + + public List getBlocks() { + while (remainingBlocks > 0) { + LOG.atDebug() + .setMessage("Requesting {} bodies from peer") + .addArgument(headersToGet.size()) + .log(); + final GetBodiesFromPeerTask task = new GetBodiesFromPeerTask(headersToGet, protocolSchedule); + final PeerTaskExecutorResult> executionResult = peerTaskExecutor.execute(task); + if (executionResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS + && executionResult.result().isPresent()) { + final List blockList = executionResult.result().get(); + LOG.atDebug() + .setMessage("Received {} bodies out of {} from peer") + .addArgument(blockList.size()) + .addArgument(headersToGet.size()) + .log(); + blockList.forEach( + block -> { + remainingBlocks--; + result[nextIndex] = block; + headersToGet.removeFirst(); + nextIndex = findNextIndex(nextIndex + 1); + }); + } + } + return List.of(result); + } + + private int findNextIndex(final int startIndex) { + for (int i = startIndex; i < resultSize; i++) { + if (result[i] == null) { + return i; + } + } + return -1; // This only happens when we have finished processing all headers + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTaskTest.java new file mode 100644 index 00000000000..90a97feadbc --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetBodiesFromPeerTaskTest.java @@ -0,0 +1,208 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.peertask.task; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.Transaction; +import org.hyperledger.besu.ethereum.core.encoding.EncodingContext; +import org.hyperledger.besu.ethereum.core.encoding.TransactionDecoder; +import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.manager.ChainState; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException; +import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage; +import org.hyperledger.besu.ethereum.eth.messages.EthPV62; +import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage; +import org.hyperledger.besu.ethereum.mainnet.BodyValidation; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; +import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.lang3.StringUtils; +import org.apache.tuweni.bytes.Bytes; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class GetBodiesFromPeerTaskTest { + + private static final String FRONTIER_TX_RLP = + "0xf901fc8032830138808080b901ae60056013565b6101918061001d6000396000f35b3360008190555056006001600060e060020a6000350480630a874df61461003a57806341c0e1b514610058578063a02b161e14610066578063dbbdf0831461007757005b610045600435610149565b80600160a060020a031660005260206000f35b610060610161565b60006000f35b6100716004356100d4565b60006000f35b61008560043560243561008b565b60006000f35b600054600160a060020a031632600160a060020a031614156100ac576100b1565b6100d0565b8060018360005260205260406000208190555081600060005260206000a15b5050565b600054600160a060020a031633600160a060020a031614158015610118575033600160a060020a0316600182600052602052604060002054600160a060020a031614155b61012157610126565b610146565b600060018260005260205260406000208190555080600060005260206000a15b50565b60006001826000526020526040600020549050919050565b600054600160a060020a031633600160a060020a0316146101815761018f565b600054600160a060020a0316ff5b561ca0c5689ed1ad124753d54576dfb4b571465a41900a1dff4058d8adf16f752013d0a01221cbd70ec28c94a3b55ec771bcbc70778d6ee0b51ca7ea9514594c861b1884"; + + private static final Transaction TX = + TransactionDecoder.decodeRLP( + new BytesValueRLPInput(Bytes.fromHexString(FRONTIER_TX_RLP), false), + EncodingContext.BLOCK_BODY); + public static final List TRANSACTION_LIST = List.of(TX); + public static final BlockBody BLOCK_BODY = + new BlockBody(TRANSACTION_LIST, Collections.emptyList(), Optional.empty()); + private static ProtocolSchedule protocolSchedule; + + @BeforeAll + public static void setup() { + protocolSchedule = mock(ProtocolSchedule.class); + final ProtocolSpec protocolSpec = mock(ProtocolSpec.class); + when(protocolSpec.isPoS()).thenReturn(true); + when(protocolSchedule.getByBlockHeader(Mockito.any())).thenReturn(protocolSpec); + } + + @Test + public void testGetSubProtocol() { + + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask(List.of(mockBlockHeader(0)), protocolSchedule); + Assertions.assertEquals(EthProtocol.get(), task.getSubProtocol()); + } + + @Test + public void testGetRequestMessage() { + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask( + List.of(mockBlockHeader(1), mockBlockHeader(2), mockBlockHeader(3)), protocolSchedule); + + MessageData messageData = task.getRequestMessage(); + GetBlockBodiesMessage getBlockBodiesMessage = GetBlockBodiesMessage.readFrom(messageData); + + Assertions.assertEquals(EthPV62.GET_BLOCK_BODIES, getBlockBodiesMessage.getCode()); + Iterable hashesInMessage = getBlockBodiesMessage.hashes(); + List expectedHashes = + List.of( + Hash.fromHexString(StringUtils.repeat("00", 31) + "11"), + Hash.fromHexString(StringUtils.repeat("00", 31) + "21"), + Hash.fromHexString(StringUtils.repeat("00", 31) + "31")); + List actualHashes = new ArrayList<>(); + hashesInMessage.forEach(actualHashes::add); + + Assertions.assertEquals(3, actualHashes.size()); + Assertions.assertEquals( + expectedHashes.stream().sorted().toList(), actualHashes.stream().sorted().toList()); + } + + @Test + public void testParseResponseWithNullResponseMessage() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> new GetBodiesFromPeerTask(Collections.emptyList(), protocolSchedule)); + } + + @Test + public void testParseResponseForInvalidResponse() { + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask(List.of(mockBlockHeader(1)), protocolSchedule); + // body does not match header + BlockBodiesMessage bodiesMessage = BlockBodiesMessage.create(List.of(BLOCK_BODY)); + + Assertions.assertThrows( + InvalidPeerTaskResponseException.class, () -> task.processResponse(bodiesMessage)); + } + + @Test + public void testParseResponse() throws InvalidPeerTaskResponseException { + final BlockHeader nonEmptyBlockHeaderMock = + getNonEmptyBlockHeaderMock(BodyValidation.transactionsRoot(TRANSACTION_LIST).toString()); + + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask(List.of(nonEmptyBlockHeaderMock), protocolSchedule); + + final BlockBodiesMessage blockBodiesMessage = BlockBodiesMessage.create(List.of(BLOCK_BODY)); + + List result = task.processResponse(blockBodiesMessage); + + assertThat(result.size()).isEqualTo(1); + assertThat(result.getFirst().getBody().getTransactions()).isEqualTo(TRANSACTION_LIST); + } + + @Test + public void testGetPeerRequirementFilter() { + BlockHeader blockHeader1 = mockBlockHeader(1); + BlockHeader blockHeader2 = mockBlockHeader(2); + BlockHeader blockHeader3 = mockBlockHeader(3); + + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask( + List.of(blockHeader1, blockHeader2, blockHeader3), protocolSchedule); + + EthPeer successfulCandidate = mockPeer(EthProtocol.NAME, 5); + + Assertions.assertTrue(task.getPeerRequirementFilter().test(successfulCandidate)); + } + + @Test + public void testIsSuccessForPartialSuccess() { + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask(List.of(mockBlockHeader(1)), protocolSchedule); + + Assertions.assertFalse(task.isSuccess(Collections.emptyList())); + } + + @Test + public void testIsSuccessForFullSuccess() { + GetBodiesFromPeerTask task = + new GetBodiesFromPeerTask(List.of(mockBlockHeader(1)), protocolSchedule); + + final List blockHeaders = List.of(mock(Block.class)); + + Assertions.assertTrue(task.isSuccess(blockHeaders)); + } + + private BlockHeader mockBlockHeader(final long blockNumber) { + BlockHeader blockHeader = Mockito.mock(BlockHeader.class); + Mockito.when(blockHeader.getNumber()).thenReturn(blockNumber); + // second to last hex digit indicates the blockNumber, last hex digit indicates the usage of the + // hash + Mockito.when(blockHeader.getHash()) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "1")); + Mockito.when(blockHeader.getBlockHash()) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "1")); + Mockito.when(blockHeader.getReceiptsRoot()) + .thenReturn(Hash.fromHexString(StringUtils.repeat("00", 31) + blockNumber + "2")); + + return blockHeader; + } + + private static BlockHeader getNonEmptyBlockHeaderMock(final String transactionsRootHexString) { + final BlockHeader blockHeader = mock(BlockHeader.class); + when(blockHeader.getTransactionsRoot()) + .thenReturn(Hash.fromHexStringLenient(transactionsRootHexString)); + when(blockHeader.getOmmersHash()).thenReturn(Hash.EMPTY_LIST_HASH); + when(blockHeader.getWithdrawalsRoot()).thenReturn(Optional.empty()); + return blockHeader; + } + + private EthPeer mockPeer(final String protocol, final long chainHeight) { + EthPeer ethPeer = Mockito.mock(EthPeer.class); + ChainState chainState = Mockito.mock(ChainState.class); + + Mockito.when(ethPeer.getProtocolName()).thenReturn(protocol); + Mockito.when(ethPeer.chainState()).thenReturn(chainState); + Mockito.when(chainState.getEstimatedHeight()).thenReturn(chainHeight); + + return ethPeer; + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java index bc493ebd036..924780fe572 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java @@ -113,6 +113,7 @@ public void shouldThrowIfSyncModeChangedWhileFastSyncIncomplete( syncConfig, dataDirectory, protocolSchedule, + () -> null, protocolContext, metricsSystem, ethContext, @@ -139,6 +140,7 @@ public void shouldNotThrowIfSyncModeChangedWhileFastSyncComplete( syncConfig, dataDirectory, protocolSchedule, + () -> null, protocolContext, metricsSystem, ethContext, @@ -168,6 +170,7 @@ public void shouldNotThrowWhenFastSyncModeRequested(final DataStorageFormat data syncConfig, dataDirectory, protocolSchedule, + () -> null, protocolContext, metricsSystem, ethContext, @@ -204,6 +207,7 @@ public void shouldClearWorldStateDuringFastSyncWhenStateQueDirectoryExists( syncConfig, dataDirectory, protocolSchedule, + () -> null, protocolContext, metricsSystem, ethContext, @@ -242,6 +246,7 @@ public void shouldCrashWhenStateQueueIsNotDirectory(final DataStorageFormat data syncConfig, dataDirectory, protocolSchedule, + () -> null, protocolContext, metricsSystem, ethContext, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java index 7af807c1c7b..7680138fc3c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncActionsTest.java @@ -535,6 +535,7 @@ private FastSyncActions createFastSyncActions( syncConfig, worldStateStorageCoordinator, protocolSchedule, + () -> null, protocolContext, ethContext, new PeerTaskExecutor(null, null, new NoOpMetricsSystem()), diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java index d7b5970098e..c2252a117e4 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fullsync; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.Blockchain; @@ -27,6 +28,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; @@ -91,7 +93,8 @@ private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) { syncState, metricsSystem, SyncTerminationCondition.never(), - SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS); + SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS, + mock(PeerTaskExecutor.class)); } private ChainDownloader downloader() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java index ac7f0fb8257..17ce3db15a1 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTest.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; +import static org.mockito.Mockito.mock; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.Blockchain; @@ -34,6 +35,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.GetBlockHeadersMessage; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; @@ -123,7 +125,8 @@ private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) { syncState, metricsSystem, SyncTerminationCondition.never(), - SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS); + SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS, + mock(PeerTaskExecutor.class)); } private ChainDownloader downloader() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java index 311ccf5de30..ddcdca4dd95 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloaderTotalTerminalDifficultyTest.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.sync.fullsync; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.chain.Blockchain; @@ -27,6 +28,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; @@ -109,7 +111,8 @@ private ChainDownloader downloader( syncState, metricsSystem, terminalCondition, - SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS); + SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS, + mock(PeerTaskExecutor.class)); } private SynchronizerConfiguration.Builder syncConfigBuilder() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java index 63e41f6d25c..c270d3fae69 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloaderTest.java @@ -98,6 +98,7 @@ private FullSyncDownloader downloader(final SynchronizerConfiguration syncConfig syncState, metricsSystem, SyncTerminationCondition.never(), + null, SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTaskTest.java new file mode 100644 index 00000000000..2853fa47d26 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksWithPeerTaskTest.java @@ -0,0 +1,243 @@ +/* + * Copyright contributors to Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.tasks; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; +import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.ethereum.mainnet.WithdrawalsProcessor; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class CompleteBlocksWithPeerTaskTest { + + @BeforeAll + public static void setUp() {} + + @Test + public void shouldFailWhenEmptyHeaders() { + assertThatThrownBy(() -> new CompleteBlocksWithPeerTask(null, Collections.emptyList(), null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Must supply a non-empty headers list"); + } + + @Test + public void shouldReturnEmptyBlock() { + final ProtocolSchedule protocolSchedule = getProtocolScheduleMock(); + final BlockHeader blockHeader = getEmptyBlockHeaderMock(); + final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class); + + CompleteBlocksWithPeerTask completeBlocksWithPeerTask = + new CompleteBlocksWithPeerTask(protocolSchedule, List.of(blockHeader), peerTaskExecutor); + final List blocks = completeBlocksWithPeerTask.getBlocks(); + assertThat(blocks).isNotEmpty(); + assertThat(blocks.size()).isEqualTo(1); + assertThat(BlockHeader.hasEmptyBlock(blocks.get(0).getHeader())).isTrue(); + + verify(peerTaskExecutor, Mockito.never()).execute(any()); + } + + @Test + public void shouldCreateWithdrawalsAwareEmptyBlock_whenWithdrawalsAreEnabled() { + final ProtocolSchedule mockProtocolSchedule = Mockito.mock(ProtocolSchedule.class); + final ProtocolSpec mockParisSpec = Mockito.mock(ProtocolSpec.class); + final ProtocolSpec mockShanghaiSpec = Mockito.mock(ProtocolSpec.class); + final WithdrawalsProcessor mockWithdrawalsProcessor = Mockito.mock(WithdrawalsProcessor.class); + + final BlockHeader header1 = + new BlockHeaderTestFixture().number(1).withdrawalsRoot(null).buildHeader(); + final BlockHeader header2 = + new BlockHeaderTestFixture().number(2).withdrawalsRoot(Hash.EMPTY_TRIE_HASH).buildHeader(); + + when(mockProtocolSchedule.getByBlockHeader((eq(header1)))).thenReturn(mockParisSpec); + when(mockParisSpec.getWithdrawalsProcessor()).thenReturn(Optional.empty()); + when(mockProtocolSchedule.getByBlockHeader((eq(header2)))).thenReturn(mockShanghaiSpec); + when(mockShanghaiSpec.getWithdrawalsProcessor()) + .thenReturn(Optional.of(mockWithdrawalsProcessor)); + + final List expectedBlocks = getExpectedBlocks(header1, header2); + + final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class); + when(peerTaskExecutor.execute(any())) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(expectedBlocks), PeerTaskExecutorResponseCode.SUCCESS)); + + final CompleteBlocksWithPeerTask task = + new CompleteBlocksWithPeerTask( + mockProtocolSchedule, asList(header1, header2), peerTaskExecutor); + final List blocks = task.getBlocks(); + + assertThat(blocks).isEqualTo(expectedBlocks); + } + + @Test + public void shouldReturnNonEmptyBlock() { + final Block block = mock(Block.class); + final ProtocolSchedule protocolSchedule = getProtocolScheduleMock(); + final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class); + final BlockHeader nonEmptyBlockHeaderMock = getNonEmptyBlockHeaderMock("0x01", "0x02"); + when(peerTaskExecutor.execute(any())) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(List.of(block)), PeerTaskExecutorResponseCode.SUCCESS)); + + CompleteBlocksWithPeerTask completeBlocksWithPeerTask = + new CompleteBlocksWithPeerTask( + protocolSchedule, List.of(nonEmptyBlockHeaderMock), peerTaskExecutor); + + final List blocks = completeBlocksWithPeerTask.getBlocks(); + assertThat(blocks).isNotEmpty(); + assertThat(blocks.size()).isEqualTo(1); + assertThat(blocks.get(0)).isEqualTo(block); + } + + @Test + public void shouldReturnBlocksInRightOrderWhenEmptyAndNonEmptyBlocksRequested() { + final Block block1 = mock(Block.class); + final Block block3 = mock(Block.class); + final BlockHeader emptyBlockHeaderMock = getEmptyBlockHeaderMock(); + final BlockHeader nonEmptyBlockHeaderMock1 = getNonEmptyBlockHeaderMock("0x01", "0x02"); + final BlockHeader nonEmptyBlockHeaderMock3 = getNonEmptyBlockHeaderMock("0x03", "0x04"); + + final ProtocolSchedule protocolSchedule = getProtocolScheduleMock(); + final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class); + when(peerTaskExecutor.execute(any())) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(List.of(block1, block3)), PeerTaskExecutorResponseCode.SUCCESS)); + + CompleteBlocksWithPeerTask completeBlocksWithPeerTask = + new CompleteBlocksWithPeerTask( + protocolSchedule, + List.of( + nonEmptyBlockHeaderMock1, + emptyBlockHeaderMock, + nonEmptyBlockHeaderMock3, + emptyBlockHeaderMock), + peerTaskExecutor); + + final List blocks = completeBlocksWithPeerTask.getBlocks(); + assertThat(blocks).isNotEmpty(); + assertThat(blocks.size()).isEqualTo(4); + assertThat(blocks.get(0)).isEqualTo(block1); + assertThat(BlockHeader.hasEmptyBlock(blocks.get(1).getHeader())).isTrue(); + assertThat(blocks.get(2)).isEqualTo(block3); + assertThat(BlockHeader.hasEmptyBlock(blocks.get(3).getHeader())).isTrue(); + } + + @Test + public void shouldRequestMoreBodiesUntilFinished() { + final Block block1 = mock(Block.class); + final Block block3 = mock(Block.class); + final BlockHeader emptyBlockHeaderMock = getEmptyBlockHeaderMock(); + final BlockHeader nonEmptyBlockHeaderMock1 = getNonEmptyBlockHeaderMock("0x01", "0x02"); + final BlockHeader nonEmptyBlockHeaderMock3 = getNonEmptyBlockHeaderMock("0x03", "0x04"); + + final ProtocolSchedule protocolSchedule = getProtocolScheduleMock(); + final PeerTaskExecutor peerTaskExecutor = mock(PeerTaskExecutor.class); + when(peerTaskExecutor.execute(any())) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(List.of(block1)), PeerTaskExecutorResponseCode.SUCCESS)) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(List.of(block3)), PeerTaskExecutorResponseCode.SUCCESS)); + + CompleteBlocksWithPeerTask completeBlocksWithPeerTask = + new CompleteBlocksWithPeerTask( + protocolSchedule, + List.of( + nonEmptyBlockHeaderMock1, + emptyBlockHeaderMock, + nonEmptyBlockHeaderMock3, + emptyBlockHeaderMock), + peerTaskExecutor); + + final List blocks = completeBlocksWithPeerTask.getBlocks(); + assertThat(blocks).isNotEmpty(); + assertThat(blocks.size()).isEqualTo(4); + assertThat(blocks.get(0)).isEqualTo(block1); + assertThat(BlockHeader.hasEmptyBlock(blocks.get(1).getHeader())).isTrue(); + assertThat(blocks.get(2)).isEqualTo(block3); + assertThat(BlockHeader.hasEmptyBlock(blocks.get(3).getHeader())).isTrue(); + } + + private static ProtocolSchedule getProtocolScheduleMock() { + final ProtocolSchedule protocolSchedule = mock(ProtocolSchedule.class); + final ProtocolSpec protocolSpec = mock(ProtocolSpec.class); + final Optional optional = Optional.of(mock(WithdrawalsProcessor.class)); + when(protocolSpec.getWithdrawalsProcessor()).thenReturn(optional); + when(protocolSchedule.getByBlockHeader(any())).thenReturn(protocolSpec); + return protocolSchedule; + } + + private static BlockHeader getEmptyBlockHeaderMock() { + final BlockHeader blockHeader = mock(BlockHeader.class); + when(blockHeader.getTransactionsRoot()).thenReturn(Hash.EMPTY_TRIE_HASH); + when(blockHeader.getOmmersHash()).thenReturn(Hash.EMPTY_LIST_HASH); + when(blockHeader.getWithdrawalsRoot()).thenReturn(Optional.empty()); + return blockHeader; + } + + private static BlockHeader getNonEmptyBlockHeaderMock( + final String transactionsRootHexString, final String ommersHash) { + final BlockHeader blockHeader = mock(BlockHeader.class); + when(blockHeader.getTransactionsRoot()) + .thenReturn(Hash.fromHexStringLenient(transactionsRootHexString)); + when(blockHeader.getOmmersHash()).thenReturn(Hash.fromHexStringLenient(ommersHash)); + when(blockHeader.getWithdrawalsRoot()).thenReturn(Optional.empty()); + return blockHeader; + } + + private static List getExpectedBlocks( + final BlockHeader header1, final BlockHeader header2) { + final Block block1 = + new Block( + header1, + new BlockBody(Collections.emptyList(), Collections.emptyList(), Optional.empty())); + final Block block2 = + new Block( + header2, + new BlockBody( + Collections.emptyList(), + Collections.emptyList(), + Optional.of(Collections.emptyList()))); + + return asList(block1, block2); + } +}