Skip to content

Commit

Permalink
Disable txpool when not in sync (hyperledger#6302)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <[email protected]>
  • Loading branch information
fab-10 authored Jan 10, 2024
1 parent d636e7f commit 0b4b815
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- Set Ethereum Classic mainnet activation block for Spiral network upgrade [#6267](https://github.com/hyperledger/besu/pull/6267)
- Add custom genesis file name to config overview if specified [#6297](https://github.com/hyperledger/besu/pull/6297)
- Update Gradle plugins and replace unmaintained License Gradle Plugin with the actively maintained Gradle License Report [#6275](https://github.com/hyperledger/besu/pull/6275)
- Disable transaction handling when the node is not in sync, to avoid unnecessary transaction validation work [#6302](https://github.com/hyperledger/besu/pull/6302)
- Optimize RocksDB WAL files, allows for faster restart and a more linear disk space utilization [#6328](https://github.com/hyperledger/besu/pull/6328)

### Bug fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
Expand All @@ -64,6 +63,7 @@
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
import org.hyperledger.besu.testutil.TestClock;

import java.math.BigInteger;
Expand Down Expand Up @@ -100,7 +100,6 @@ public class BesuEventsImplTest {
@Mock private EthPeers mockEthPeers;
@Mock private EthContext mockEthContext;
@Mock private EthMessages mockEthMessages;
@Mock private EthScheduler mockEthScheduler;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private TransactionValidatorFactory mockTransactionValidatorFactory;
Expand Down Expand Up @@ -128,7 +127,7 @@ public void setUp() {

when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages);
when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers);
when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler);
when(mockEthContext.getScheduler()).thenReturn(new DeterministicEthScheduler());
lenient().when(mockEthPeers.streamAvailablePeers()).thenAnswer(z -> Stream.empty());
when(mockProtocolContext.getBlockchain()).thenReturn(blockchain);
lenient().when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -107,7 +108,7 @@ public void setUp() {
blockchain::getChainHeadHeader);
final ProtocolContext protocolContext = executionContext.getProtocolContext();

EthContext ethContext = mock(EthContext.class);
EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -107,7 +108,7 @@ public void setUp() {
blockchain::getChainHeadHeader);
final ProtocolContext protocolContext = executionContext.getProtocolContext();

EthContext ethContext = mock(EthContext.class);
EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -34,6 +36,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -295,4 +299,49 @@ public <T> void failAfterTimeout(final CompletableFuture<T> promise, final Durat
delay,
unit);
}

public <ITEM> OrderedProcessor<ITEM> createOrderedProcessor(final Consumer<ITEM> processor) {
return new OrderedProcessor<>(processor);
}

/**
* This class is a way to execute a set of tasks, one by one, in a strict order, without blocking
* the caller in case there are still previous tasks queued
*
* @param <ITEM> the class of item to be processed
*/
public class OrderedProcessor<ITEM> {
private final Queue<ITEM> blockAddedQueue = new ConcurrentLinkedQueue<>();
private final ReentrantLock blockAddedLock = new ReentrantLock();
private final Consumer<ITEM> processor;

private OrderedProcessor(final Consumer<ITEM> processor) {
this.processor = processor;
}

public void submit(final ITEM item) {
// add the item to the processing queue
blockAddedQueue.add(item);

if (blockAddedLock.hasQueuedThreads()) {
// another thread is already waiting to process the queue with our item, there is no need to
// schedule another thread
LOG.trace(
"Block added event queue is already being processed and an already queued thread is present, nothing to do");
} else {
servicesExecutor.submit(
() -> {
blockAddedLock.lock();
try {
// now that we have the lock, process as many items as possible
for (ITEM i = blockAddedQueue.poll(); i != null; i = blockAddedQueue.poll()) {
processor.accept(i);
}
} finally {
blockAddedLock.unlock();
}
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidator;
Expand Down Expand Up @@ -61,11 +62,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -107,8 +106,7 @@ public class TransactionPool implements BlockAddedObserver {
private volatile OptionalLong subscribeConnectId = OptionalLong.empty();
private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager();
private final Set<Address> localSenders = ConcurrentHashMap.newKeySet();
private final Lock blockAddedLock = new ReentrantLock();
private final Queue<BlockAddedEvent> blockAddedQueue = new ConcurrentLinkedQueue<>();
private final EthScheduler.OrderedProcessor<BlockAddedEvent> blockAddedEventOrderedProcessor;

public TransactionPool(
final Supplier<PendingTransactions> pendingTransactionsSupplier,
Expand All @@ -130,6 +128,8 @@ public TransactionPool(
pluginTransactionValidatorFactory == null
? null
: pluginTransactionValidatorFactory.create();
this.blockAddedEventOrderedProcessor =
ethContext.getScheduler().createOrderedProcessor(this::processBlockAddedEvent);
initLogForReplay();
}

Expand Down Expand Up @@ -322,58 +322,29 @@ public void unsubscribeDroppedTransactions(final long id) {
@Override
public void onBlockAdded(final BlockAddedEvent event) {
if (isPoolEnabled.get()) {
final long started = System.currentTimeMillis();
if (event.getEventType().equals(BlockAddedEvent.EventType.HEAD_ADVANCED)
|| event.getEventType().equals(BlockAddedEvent.EventType.CHAIN_REORG)) {

// add the event to the processing queue
blockAddedQueue.add(event);

// we want to process the added block asynchronously,
// but at the same time we must ensure that blocks are processed in order one at time
ethContext
.getScheduler()
.scheduleServiceTask(
() -> {
while (!blockAddedQueue.isEmpty()) {
if (blockAddedLock.tryLock()) {
// no other thread is processing the queue, so start processing it
try {
BlockAddedEvent e = blockAddedQueue.poll();
// check again since another thread could have stolen our task
if (e != null) {
pendingTransactions.manageBlockAdded(
e.getBlock().getHeader(),
e.getAddedTransactions(),
e.getRemovedTransactions(),
protocolSchedule
.getByBlockHeader(e.getBlock().getHeader())
.getFeeMarket());
reAddTransactions(e.getRemovedTransactions());
LOG.atTrace()
.setMessage("Block added event {} processed in {}ms")
.addArgument(e)
.addArgument(() -> System.currentTimeMillis() - started)
.log();
}
} finally {
blockAddedLock.unlock();
}
} else {
try {
// wait a bit before retrying
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
return null;
});
blockAddedEventOrderedProcessor.submit(event);
}
}
}

private void processBlockAddedEvent(final BlockAddedEvent e) {
final long started = System.currentTimeMillis();
pendingTransactions.manageBlockAdded(
e.getBlock().getHeader(),
e.getAddedTransactions(),
e.getRemovedTransactions(),
protocolSchedule.getByBlockHeader(e.getBlock().getHeader()).getFeeMarket());
reAddTransactions(e.getRemovedTransactions());
LOG.atTrace()
.setMessage("Block added event {} processed in {}ms")
.addArgument(e)
.addArgument(() -> System.currentTimeMillis() - started)
.log();
}

private void reAddTransactions(final List<Transaction> reAddTransactions) {
if (!reAddTransactions.isEmpty()) {
// if adding a blob tx, and it is missing its blob, is a re-org and we should restore the blob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,62 @@ static TransactionPool createTransactionPool(
@Override
public void onInitialSyncCompleted() {
LOG.info("Enabling transaction handling following initial sync");
transactionTracker.reset();
transactionPool.setEnabled();
transactionsMessageHandler.setEnabled();
pooledTransactionsMessageHandler.setEnabled();
enableTransactionHandling(
transactionTracker,
transactionPool,
transactionsMessageHandler,
pooledTransactionsMessageHandler);
}

@Override
public void onInitialSyncRestart() {
LOG.info("Disabling transaction handling during re-sync");
pooledTransactionsMessageHandler.setDisabled();
transactionsMessageHandler.setDisabled();
transactionPool.setDisabled();
disableTransactionHandling(
transactionPool, transactionsMessageHandler, pooledTransactionsMessageHandler);
}
});

syncState.subscribeInSync(
isInSync -> {
if (isInSync != transactionPool.isEnabled()) {
if (isInSync) {
LOG.info("Node is in sync, enabling transaction handling");
enableTransactionHandling(
transactionTracker,
transactionPool,
transactionsMessageHandler,
pooledTransactionsMessageHandler);
} else {
LOG.info("Node out of sync, disabling transaction handling");
disableTransactionHandling(
transactionPool, transactionsMessageHandler, pooledTransactionsMessageHandler);
}
}
});

return transactionPool;
}

private static void enableTransactionHandling(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionsMessageHandler transactionsMessageHandler,
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) {
transactionTracker.reset();
transactionPool.setEnabled();
transactionsMessageHandler.setEnabled();
pooledTransactionsMessageHandler.setEnabled();
}

private static void disableTransactionHandling(
final TransactionPool transactionPool,
final TransactionsMessageHandler transactionsMessageHandler,
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) {
transactionPool.setDisabled();
transactionsMessageHandler.setDisabled();
pooledTransactionsMessageHandler.setDisabled();
}

private static void subscribeTransactionHandlers(
final ProtocolContext protocolContext,
final EthContext ethContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.ReplaceableDoubleSupplier;
import org.hyperledger.besu.metrics.RunnableCounter;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
Expand Down Expand Up @@ -46,6 +47,9 @@ public class TransactionPoolMetrics {
private final LabelledMetric<Counter> expiredMessagesCounter;
private final Map<String, RunnableCounter> expiredMessagesRunnableCounters = new HashMap<>();
private final LabelledMetric<Counter> alreadySeenTransactionsCounter;
private final Map<String, ReplaceableDoubleSupplier> spaceUsedSuppliers = new HashMap<>();
private final Map<String, ReplaceableDoubleSupplier> transactionCountSuppliers = new HashMap<>();
private final Map<String, ReplaceableDoubleSupplier> uniqueSendersSuppliers = new HashMap<>();

public TransactionPoolMetrics(final MetricsSystem metricsSystem) {
this.metricsSystem = metricsSystem;
Expand Down Expand Up @@ -120,17 +124,44 @@ public MetricsSystem getMetricsSystem() {
}

public void initSpaceUsed(final DoubleSupplier spaceUsedSupplier, final String layer) {
spaceUsed.labels(spaceUsedSupplier, layer);
spaceUsedSuppliers.compute(
layer,
(unused, existingSupplier) -> {
if (existingSupplier == null) {
final var newSupplier = new ReplaceableDoubleSupplier(spaceUsedSupplier);
spaceUsed.labels(newSupplier, layer);
return newSupplier;
}
return existingSupplier.replaceDoubleSupplier(spaceUsedSupplier);
});
}

public void initTransactionCount(
final DoubleSupplier transactionCountSupplier, final String layer) {
transactionCount.labels(transactionCountSupplier, layer);
transactionCountSuppliers.compute(
layer,
(unused, existingSupplier) -> {
if (existingSupplier == null) {
final var newSupplier = new ReplaceableDoubleSupplier(transactionCountSupplier);
transactionCount.labels(newSupplier, layer);
return newSupplier;
}
return existingSupplier.replaceDoubleSupplier(transactionCountSupplier);
});
}

public void initUniqueSenderCount(
final DoubleSupplier uniqueSenderCountSupplier, final String layer) {
uniqueSenderCount.labels(uniqueSenderCountSupplier, layer);
uniqueSendersSuppliers.compute(
layer,
(unused, existingSupplier) -> {
if (existingSupplier == null) {
final var newSupplier = new ReplaceableDoubleSupplier(uniqueSenderCountSupplier);
uniqueSenderCount.labels(newSupplier, layer);
return newSupplier;
}
return existingSupplier.replaceDoubleSupplier(uniqueSenderCountSupplier);
});
}

public void initExpiredMessagesCounter(final String message) {
Expand Down
Loading

0 comments on commit 0b4b815

Please sign in to comment.