Skip to content

Commit

Permalink
try fix for parallelization
Browse files Browse the repository at this point in the history
Signed-off-by: Karim Taam <[email protected]>
  • Loading branch information
matkt committed Jan 7, 2025
1 parent 2106eea commit 9985e73
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private void handleFailedBlockProcessing(
protected BlockProcessingResult processBlock(
final ProtocolContext context, final MutableWorldState worldState, final Block block) {

return blockProcessor.processBlock(context.getBlockchain(), worldState, block);
return blockProcessor.processBlock(context, context.getBlockchain(), worldState, block);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.BlockProcessingOutputs;
import org.hyperledger.besu.ethereum.BlockProcessingResult;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
Expand Down Expand Up @@ -95,6 +96,7 @@ protected AbstractBlockProcessor(

@Override
public BlockProcessingResult processBlock(
final ProtocolContext protocolContext,
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
Expand All @@ -103,6 +105,7 @@ public BlockProcessingResult processBlock(
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater) {
return processBlock(
protocolContext,
blockchain,
worldState,
blockHeader,
Expand All @@ -114,6 +117,7 @@ public BlockProcessingResult processBlock(
}

protected BlockProcessingResult processBlock(
final ProtocolContext protocolContext,
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
Expand Down Expand Up @@ -148,7 +152,7 @@ protected BlockProcessingResult processBlock(

final Optional<PreprocessingContext> preProcessingContext =
preprocessingBlockFunction.run(
worldState,
protocolContext,
privateMetadataUpdater,
blockHeader,
transactions,
Expand Down Expand Up @@ -239,7 +243,7 @@ protected BlockProcessingResult processBlock(
protocolSpec.getRequestProcessorCoordinator();
Optional<List<Request>> maybeRequests = Optional.empty();
if (requestProcessor.isPresent()) {
ProcessRequestContext context =
ProcessRequestContext processRequestContext =
new ProcessRequestContext(
blockHeader,
worldState,
Expand All @@ -248,7 +252,7 @@ protected BlockProcessingResult processBlock(
blockHashLookup,
OperationTracer.NO_TRACING);

maybeRequests = Optional.of(requestProcessor.get().process(context));
maybeRequests = Optional.of(requestProcessor.get().process(processRequestContext));
}

if (!rewardCoinbase(worldState, blockHeader, ommers, skipZeroBlockRewards)) {
Expand Down Expand Up @@ -332,7 +336,7 @@ public interface PreprocessingContext {}

public interface PreprocessingFunction {
Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final ProtocolContext protocolContext,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
Expand All @@ -344,7 +348,7 @@ class NoPreprocessing implements PreprocessingFunction {

@Override
public Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final ProtocolContext protocolContext,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.BlockProcessingResult;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
Expand Down Expand Up @@ -68,14 +69,19 @@ default boolean isFailed() {
/**
* Processes the block.
*
* @param protocolContext the current context of the protocol
* @param blockchain the blockchain to append the block to
* @param worldState the world state to apply changes to
* @param block the block to process
* @return the block processing result
*/
default BlockProcessingResult processBlock(
final Blockchain blockchain, final MutableWorldState worldState, final Block block) {
final ProtocolContext protocolContext,
final Blockchain blockchain,
final MutableWorldState worldState,
final Block block) {
return processBlock(
protocolContext,
blockchain,
worldState,
block.getHeader(),
Expand All @@ -88,6 +94,7 @@ default BlockProcessingResult processBlock(
/**
* Processes the block.
*
* @param protocolContext the current context of the protocol
* @param blockchain the blockchain to append the block to
* @param worldState the world state to apply changes to
* @param blockHeader the block header for the block
Expand All @@ -96,18 +103,27 @@ default BlockProcessingResult processBlock(
* @return the block processing result
*/
default BlockProcessingResult processBlock(
final ProtocolContext protocolContext,
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers) {
return processBlock(
blockchain, worldState, blockHeader, transactions, ommers, Optional.empty(), null);
protocolContext,
blockchain,
worldState,
blockHeader,
transactions,
ommers,
Optional.empty(),
null);
}

/**
* Processes the block.
*
* @param protocolContext the current context of the protocol
* @param blockchain the blockchain to append the block to
* @param worldState the world state to apply changes to
* @param blockHeader the block header for the block
Expand All @@ -118,6 +134,7 @@ default BlockProcessingResult processBlock(
* @return the block processing result
*/
BlockProcessingResult processBlock(
ProtocolContext protocolContext,
Blockchain blockchain,
MutableWorldState worldState,
BlockHeader blockHeader,
Expand All @@ -129,13 +146,15 @@ BlockProcessingResult processBlock(
/**
* Processes the block when running Besu in GoQuorum-compatible mode
*
* @param protocolContext the current context of the protocol
* @param blockchain the blockchain to append the block to
* @param worldState the world state to apply public transactions to
* @param privateWorldState the private world state to apply private transaction to
* @param block the block to process
* @return the block processing result
*/
default BlockProcessingResult processBlock(
final ProtocolContext protocolContext,
final Blockchain blockchain,
final MutableWorldState worldState,
final MutableWorldState privateWorldState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.BlockProcessingResult;
import org.hyperledger.besu.ethereum.MainnetBlockValidator;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningConfiguration;
Expand Down Expand Up @@ -988,6 +989,7 @@ private record DaoBlockProcessor(BlockProcessor wrapped) implements BlockProcess

@Override
public BlockProcessingResult processBlock(
final ProtocolContext protocolContext,
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
Expand All @@ -997,6 +999,7 @@ public BlockProcessingResult processBlock(
final PrivateMetadataUpdater privateMetadataUpdater) {
updateWorldStateForDao(worldState);
return wrapped.processBlock(
protocolContext,
blockchain,
worldState,
blockHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.hyperledger.besu.enclave.EnclaveClientException;
import org.hyperledger.besu.enclave.types.ReceiveResponse;
import org.hyperledger.besu.ethereum.BlockProcessingResult;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
Expand Down Expand Up @@ -83,6 +84,7 @@ public void setPublicWorldStateArchive(final WorldStateArchive publicWorldStateA

@Override
public BlockProcessingResult processBlock(
final ProtocolContext protocolContext,
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
Expand All @@ -102,6 +104,7 @@ public BlockProcessingResult processBlock(

final BlockProcessingResult result =
blockProcessor.processBlock(
protocolContext,
blockchain,
worldState,
blockHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.BlockProcessingResult;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
Expand All @@ -31,7 +32,7 @@
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpecBuilder;
import org.hyperledger.besu.ethereum.privacy.storage.PrivateMetadataUpdater;
import org.hyperledger.besu.ethereum.processing.TransactionProcessingResult;
import org.hyperledger.besu.ethereum.trie.diffbased.common.worldview.DiffBasedWorldState;
import org.hyperledger.besu.ethereum.trie.diffbased.common.DiffBasedWorldStateProvider;
import org.hyperledger.besu.evm.operation.BlockHashOperation;
import org.hyperledger.besu.evm.worldstate.WorldUpdater;
import org.hyperledger.besu.metrics.BesuMetricCategory;
Expand Down Expand Up @@ -137,6 +138,7 @@ protected TransactionProcessingResult getTransactionProcessingResult(

@Override
public BlockProcessingResult processBlock(
final ProtocolContext protocolContext,
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
Expand All @@ -146,6 +148,7 @@ public BlockProcessingResult processBlock(
final PrivateMetadataUpdater privateMetadataUpdater) {
final BlockProcessingResult blockProcessingResult =
super.processBlock(
protocolContext,
blockchain,
worldState,
blockHeader,
Expand All @@ -154,13 +157,15 @@ public BlockProcessingResult processBlock(
maybeWithdrawals,
privateMetadataUpdater,
new ParallelTransactionPreprocessing());

if (blockProcessingResult.isFailed()) {
// Fallback to non-parallel processing if there is a block processing exception .
LOG.warn(
"Block processing failed. Falling back to non-parallel processing for block #{} ({})",
blockHeader.getNumber(),
blockHeader.getBlockHash());
return super.processBlock(
protocolContext,
blockchain,
worldState,
blockHeader,
Expand Down Expand Up @@ -209,20 +214,20 @@ class ParallelTransactionPreprocessing implements PreprocessingFunction {

@Override
public Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final ProtocolContext protocolContext,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashOperation.BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
if ((worldState instanceof DiffBasedWorldState)) {
if ((protocolContext.getWorldStateArchive() instanceof DiffBasedWorldStateProvider)) {
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
// runAsyncBlock, if activated, facilitates the non-blocking parallel execution of
// transactions in the background through an optimistic strategy.
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
worldState,
protocolContext,
blockHeader,
transactions,
miningBeneficiary,
Expand Down
Loading

0 comments on commit 9985e73

Please sign in to comment.