From 0b4b81595e04d822bb6c5cc9e845f5cb2188f981 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Wed, 10 Jan 2024 14:56:37 +0100 Subject: [PATCH] Disable txpool when not in sync (#6302) Signed-off-by: Fabio Di Fabio --- CHANGELOG.md | 1 + .../besu/services/BesuEventsImplTest.java | 5 +- .../EthGetFilterChangesIntegrationTest.java | 3 +- .../EthGetFilterChangesIntegrationTest.java | 3 +- .../ethereum/eth/manager/EthScheduler.java | 49 +++++++++++++ .../eth/transactions/TransactionPool.java | 69 ++++++------------- .../transactions/TransactionPoolFactory.java | 52 ++++++++++++-- .../transactions/TransactionPoolMetrics.java | 37 +++++++++- .../eth/manager/EthSchedulerTest.java | 47 +++++++++++++ .../ethtaskutils/AbstractMessageTaskTest.java | 6 +- .../ethtaskutils/PeerMessageTaskTest.java | 6 +- .../AbstractTransactionPoolTest.java | 7 +- .../metrics/ReplaceableDoubleSupplier.java | 55 +++++++++++++++ .../ReplaceableDoubleSupplierTest.java | 36 ++++++++++ 14 files changed, 300 insertions(+), 76 deletions(-) create mode 100644 metrics/core/src/main/java/org/hyperledger/besu/metrics/ReplaceableDoubleSupplier.java create mode 100644 metrics/core/src/test/java/org/hyperledger/besu/metrics/ReplaceableDoubleSupplierTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f0fad70fef..5f51e8b4c2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - Set Ethereum Classic mainnet activation block for Spiral network upgrade [#6267](https://github.com/hyperledger/besu/pull/6267) - Add custom genesis file name to config overview if specified [#6297](https://github.com/hyperledger/besu/pull/6297) - Update Gradle plugins and replace unmaintained License Gradle Plugin with the actively maintained Gradle License Report [#6275](https://github.com/hyperledger/besu/pull/6275) +- Disable transaction handling when the node is not in sync, to avoid unnecessary transaction validation work [#6302](https://github.com/hyperledger/besu/pull/6302) - Optimize RocksDB WAL files, allows for faster restart and a more linear disk space utilization [#6328](https://github.com/hyperledger/besu/pull/6328) ### Bug fixes diff --git a/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java b/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java index 25db1cb9b49..93cf830faf1 100644 --- a/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java +++ b/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java @@ -41,7 +41,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthMessages; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; -import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; @@ -64,6 +63,7 @@ import org.hyperledger.besu.plugin.data.PropagatedBlockContext; import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; +import org.hyperledger.besu.testutil.DeterministicEthScheduler; import org.hyperledger.besu.testutil.TestClock; import java.math.BigInteger; @@ -100,7 +100,6 @@ public class BesuEventsImplTest { @Mock private EthPeers mockEthPeers; @Mock private EthContext mockEthContext; @Mock private EthMessages mockEthMessages; - @Mock private EthScheduler mockEthScheduler; @Mock(answer = Answers.RETURNS_DEEP_STUBS) private TransactionValidatorFactory mockTransactionValidatorFactory; @@ -128,7 +127,7 @@ public void setUp() { when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages); when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers); - when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler); + when(mockEthContext.getScheduler()).thenReturn(new DeterministicEthScheduler()); lenient().when(mockEthPeers.streamAvailablePeers()).thenAnswer(z -> Stream.empty()); when(mockProtocolContext.getBlockchain()).thenReturn(blockchain); lenient().when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive); diff --git a/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/frontier/EthGetFilterChangesIntegrationTest.java b/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/frontier/EthGetFilterChangesIntegrationTest.java index cff37a0392b..1864246401e 100644 --- a/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/frontier/EthGetFilterChangesIntegrationTest.java +++ b/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/frontier/EthGetFilterChangesIntegrationTest.java @@ -18,6 +18,7 @@ import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -107,7 +108,7 @@ public void setUp() { blockchain::getChainHeadHeader); final ProtocolContext protocolContext = executionContext.getProtocolContext(); - EthContext ethContext = mock(EthContext.class); + EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS); EthPeers ethPeers = mock(EthPeers.class); when(ethContext.getEthPeers()).thenReturn(ethPeers); diff --git a/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java b/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java index 0026c56ad84..49a2fbcd87b 100644 --- a/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java +++ b/ethereum/api/src/integration-test/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/fork/london/EthGetFilterChangesIntegrationTest.java @@ -18,6 +18,7 @@ import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -107,7 +108,7 @@ public void setUp() { blockchain::getChainHeadHeader); final ProtocolContext protocolContext = executionContext.getProtocolContext(); - EthContext ethContext = mock(EthContext.class); + EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS); EthPeers ethPeers = mock(EthPeers.class); when(ethContext.getEthPeers()).thenReturn(ethPeers); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java index 43f8d6a4c35..61a01bf54f3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java @@ -23,9 +23,11 @@ import java.time.Duration; import java.util.Collection; +import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -34,6 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -295,4 +299,49 @@ public void failAfterTimeout(final CompletableFuture promise, final Durat delay, unit); } + + public OrderedProcessor createOrderedProcessor(final Consumer processor) { + return new OrderedProcessor<>(processor); + } + + /** + * This class is a way to execute a set of tasks, one by one, in a strict order, without blocking + * the caller in case there are still previous tasks queued + * + * @param the class of item to be processed + */ + public class OrderedProcessor { + private final Queue blockAddedQueue = new ConcurrentLinkedQueue<>(); + private final ReentrantLock blockAddedLock = new ReentrantLock(); + private final Consumer processor; + + private OrderedProcessor(final Consumer processor) { + this.processor = processor; + } + + public void submit(final ITEM item) { + // add the item to the processing queue + blockAddedQueue.add(item); + + if (blockAddedLock.hasQueuedThreads()) { + // another thread is already waiting to process the queue with our item, there is no need to + // schedule another thread + LOG.trace( + "Block added event queue is already being processed and an already queued thread is present, nothing to do"); + } else { + servicesExecutor.submit( + () -> { + blockAddedLock.lock(); + try { + // now that we have the lock, process as many items as possible + for (ITEM i = blockAddedQueue.poll(); i != null; i = blockAddedQueue.poll()) { + processor.accept(i); + } + } finally { + blockAddedLock.unlock(); + } + }); + } + } + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index 5425cf1c383..a3d164105d0 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -31,6 +31,7 @@ import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams; import org.hyperledger.besu.ethereum.mainnet.TransactionValidator; @@ -61,11 +62,9 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; -import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -107,8 +106,7 @@ public class TransactionPool implements BlockAddedObserver { private volatile OptionalLong subscribeConnectId = OptionalLong.empty(); private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager(); private final Set
localSenders = ConcurrentHashMap.newKeySet(); - private final Lock blockAddedLock = new ReentrantLock(); - private final Queue blockAddedQueue = new ConcurrentLinkedQueue<>(); + private final EthScheduler.OrderedProcessor blockAddedEventOrderedProcessor; public TransactionPool( final Supplier pendingTransactionsSupplier, @@ -130,6 +128,8 @@ public TransactionPool( pluginTransactionValidatorFactory == null ? null : pluginTransactionValidatorFactory.create(); + this.blockAddedEventOrderedProcessor = + ethContext.getScheduler().createOrderedProcessor(this::processBlockAddedEvent); initLogForReplay(); } @@ -322,58 +322,29 @@ public void unsubscribeDroppedTransactions(final long id) { @Override public void onBlockAdded(final BlockAddedEvent event) { if (isPoolEnabled.get()) { - final long started = System.currentTimeMillis(); if (event.getEventType().equals(BlockAddedEvent.EventType.HEAD_ADVANCED) || event.getEventType().equals(BlockAddedEvent.EventType.CHAIN_REORG)) { - // add the event to the processing queue - blockAddedQueue.add(event); - - // we want to process the added block asynchronously, - // but at the same time we must ensure that blocks are processed in order one at time - ethContext - .getScheduler() - .scheduleServiceTask( - () -> { - while (!blockAddedQueue.isEmpty()) { - if (blockAddedLock.tryLock()) { - // no other thread is processing the queue, so start processing it - try { - BlockAddedEvent e = blockAddedQueue.poll(); - // check again since another thread could have stolen our task - if (e != null) { - pendingTransactions.manageBlockAdded( - e.getBlock().getHeader(), - e.getAddedTransactions(), - e.getRemovedTransactions(), - protocolSchedule - .getByBlockHeader(e.getBlock().getHeader()) - .getFeeMarket()); - reAddTransactions(e.getRemovedTransactions()); - LOG.atTrace() - .setMessage("Block added event {} processed in {}ms") - .addArgument(e) - .addArgument(() -> System.currentTimeMillis() - started) - .log(); - } - } finally { - blockAddedLock.unlock(); - } - } else { - try { - // wait a bit before retrying - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - return null; - }); + blockAddedEventOrderedProcessor.submit(event); } } } + private void processBlockAddedEvent(final BlockAddedEvent e) { + final long started = System.currentTimeMillis(); + pendingTransactions.manageBlockAdded( + e.getBlock().getHeader(), + e.getAddedTransactions(), + e.getRemovedTransactions(), + protocolSchedule.getByBlockHeader(e.getBlock().getHeader()).getFeeMarket()); + reAddTransactions(e.getRemovedTransactions()); + LOG.atTrace() + .setMessage("Block added event {} processed in {}ms") + .addArgument(e) + .addArgument(() -> System.currentTimeMillis() - started) + .log(); + } + private void reAddTransactions(final List reAddTransactions) { if (!reAddTransactions.isEmpty()) { // if adding a blob tx, and it is missing its blob, is a re-org and we should restore the blob diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java index 97c2a1538fa..253d37a4758 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java @@ -156,24 +156,62 @@ static TransactionPool createTransactionPool( @Override public void onInitialSyncCompleted() { LOG.info("Enabling transaction handling following initial sync"); - transactionTracker.reset(); - transactionPool.setEnabled(); - transactionsMessageHandler.setEnabled(); - pooledTransactionsMessageHandler.setEnabled(); + enableTransactionHandling( + transactionTracker, + transactionPool, + transactionsMessageHandler, + pooledTransactionsMessageHandler); } @Override public void onInitialSyncRestart() { LOG.info("Disabling transaction handling during re-sync"); - pooledTransactionsMessageHandler.setDisabled(); - transactionsMessageHandler.setDisabled(); - transactionPool.setDisabled(); + disableTransactionHandling( + transactionPool, transactionsMessageHandler, pooledTransactionsMessageHandler); + } + }); + + syncState.subscribeInSync( + isInSync -> { + if (isInSync != transactionPool.isEnabled()) { + if (isInSync) { + LOG.info("Node is in sync, enabling transaction handling"); + enableTransactionHandling( + transactionTracker, + transactionPool, + transactionsMessageHandler, + pooledTransactionsMessageHandler); + } else { + LOG.info("Node out of sync, disabling transaction handling"); + disableTransactionHandling( + transactionPool, transactionsMessageHandler, pooledTransactionsMessageHandler); + } } }); return transactionPool; } + private static void enableTransactionHandling( + final PeerTransactionTracker transactionTracker, + final TransactionPool transactionPool, + final TransactionsMessageHandler transactionsMessageHandler, + final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) { + transactionTracker.reset(); + transactionPool.setEnabled(); + transactionsMessageHandler.setEnabled(); + pooledTransactionsMessageHandler.setEnabled(); + } + + private static void disableTransactionHandling( + final TransactionPool transactionPool, + final TransactionsMessageHandler transactionsMessageHandler, + final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) { + transactionPool.setDisabled(); + transactionsMessageHandler.setDisabled(); + pooledTransactionsMessageHandler.setDisabled(); + } + private static void subscribeTransactionHandlers( final ProtocolContext protocolContext, final EthContext ethContext, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java index 9ace1819cc1..20657304f0f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java @@ -16,6 +16,7 @@ import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason; import org.hyperledger.besu.metrics.BesuMetricCategory; +import org.hyperledger.besu.metrics.ReplaceableDoubleSupplier; import org.hyperledger.besu.metrics.RunnableCounter; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.Counter; @@ -46,6 +47,9 @@ public class TransactionPoolMetrics { private final LabelledMetric expiredMessagesCounter; private final Map expiredMessagesRunnableCounters = new HashMap<>(); private final LabelledMetric alreadySeenTransactionsCounter; + private final Map spaceUsedSuppliers = new HashMap<>(); + private final Map transactionCountSuppliers = new HashMap<>(); + private final Map uniqueSendersSuppliers = new HashMap<>(); public TransactionPoolMetrics(final MetricsSystem metricsSystem) { this.metricsSystem = metricsSystem; @@ -120,17 +124,44 @@ public MetricsSystem getMetricsSystem() { } public void initSpaceUsed(final DoubleSupplier spaceUsedSupplier, final String layer) { - spaceUsed.labels(spaceUsedSupplier, layer); + spaceUsedSuppliers.compute( + layer, + (unused, existingSupplier) -> { + if (existingSupplier == null) { + final var newSupplier = new ReplaceableDoubleSupplier(spaceUsedSupplier); + spaceUsed.labels(newSupplier, layer); + return newSupplier; + } + return existingSupplier.replaceDoubleSupplier(spaceUsedSupplier); + }); } public void initTransactionCount( final DoubleSupplier transactionCountSupplier, final String layer) { - transactionCount.labels(transactionCountSupplier, layer); + transactionCountSuppliers.compute( + layer, + (unused, existingSupplier) -> { + if (existingSupplier == null) { + final var newSupplier = new ReplaceableDoubleSupplier(transactionCountSupplier); + transactionCount.labels(newSupplier, layer); + return newSupplier; + } + return existingSupplier.replaceDoubleSupplier(transactionCountSupplier); + }); } public void initUniqueSenderCount( final DoubleSupplier uniqueSenderCountSupplier, final String layer) { - uniqueSenderCount.labels(uniqueSenderCountSupplier, layer); + uniqueSendersSuppliers.compute( + layer, + (unused, existingSupplier) -> { + if (existingSupplier == null) { + final var newSupplier = new ReplaceableDoubleSupplier(uniqueSenderCountSupplier); + uniqueSenderCount.labels(newSupplier, layer); + return newSupplier; + } + return existingSupplier.replaceDoubleSupplier(uniqueSenderCountSupplier); + }); } public void initExpiredMessagesCounter(final String message) { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthSchedulerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthSchedulerTest.java index d09f02c6b76..e7797576d64 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthSchedulerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthSchedulerTest.java @@ -21,16 +21,24 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.testutil.DeterministicEthScheduler; import org.hyperledger.besu.testutil.MockExecutorService; import org.hyperledger.besu.testutil.MockScheduledExecutor; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -209,4 +217,43 @@ public void timeout_cancelsTaskWhenResultIsCancelled() { assertThat(task.isFailed()).isTrue(); assertThat(task.isCancelled()).isTrue(); } + + @Test + public void itemsSubmittedToOrderedProcessorAreProcessedInOrder() throws InterruptedException { + final int numOfItems = 100; + final Random random = new Random(); + final EthScheduler realEthScheduler = new EthScheduler(1, 1, 1, new NoOpMetricsSystem()); + + final List processedStrings = new CopyOnWriteArrayList<>(); + + final Consumer stringProcessor = + s -> { + processedStrings.add(s); + try { + Thread.sleep(random.nextInt(20)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + final var orderProcessor = realEthScheduler.createOrderedProcessor(stringProcessor); + IntStream.range(0, numOfItems) + .mapToObj(String::valueOf) + .forEach( + s -> { + orderProcessor.submit(s); + try { + Thread.sleep(random.nextInt(20)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + final List expectedStrings = new ArrayList<>(numOfItems); + IntStream.range(0, numOfItems).mapToObj(String::valueOf).forEach(expectedStrings::add); + + Awaitility.await().until(() -> processedStrings.size() == numOfItems); + + assertThat(processedStrings).containsExactlyElementsOf(expectedStrings); + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java index c5ca2f3d8a7..1538ecc7278 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java @@ -166,7 +166,7 @@ public void completesWhenPeersAreResponsive() { RespondingEthPeer.blockchainResponder( blockchain, protocolContext.getWorldStateArchive(), transactionPool); final RespondingEthPeer respondingPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32); // Setup data to be requested and expected response final T requestedData = generateDataToBeRequested(); @@ -190,7 +190,7 @@ public void completesWhenPeersAreResponsive() { @Test public void doesNotCompleteWhenPeersDoNotRespond() { // Setup a unresponsive peer - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32); // Setup data to be requested final T requestedData = generateDataToBeRequested(); @@ -209,7 +209,7 @@ public void doesNotCompleteWhenPeersDoNotRespond() { @Test public void cancel() { // Setup a unresponsive peer - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32); // Setup data to be requested final T requestedData = generateDataToBeRequested(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java index 1405fbc24ac..0ed6569387b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/PeerMessageTaskTest.java @@ -58,7 +58,7 @@ public void completesWhenPeerReturnsPartialResult() { protocolSchedule, 0.5f); final RespondingEthPeer respondingEthPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32); // Execute task and wait for response final AtomicReference actualResult = new AtomicReference<>(); @@ -109,7 +109,7 @@ public void completesWhenPeersSendEmptyResponses() { // Setup a unresponsive peer final RespondingEthPeer.Responder responder = RespondingEthPeer.emptyResponder(); final RespondingEthPeer respondingEthPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32); // Setup data to be requested final T requestedData = generateDataToBeRequested(); @@ -129,7 +129,7 @@ public void recordsTimeoutAgainstPeerWhenTaskTimesOut() { peersDoTimeout.set(true); // Setup a unresponsive peer final RespondingEthPeer respondingEthPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 32); // Setup data to be requested final T requestedData = generateDataToBeRequested(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java index a87ca36bfb8..c34b4f0906d 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java @@ -32,7 +32,6 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -100,7 +99,6 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; @@ -233,12 +231,9 @@ public void setUp() { ethProtocolManager = EthProtocolManagerTestUtil.create(); ethContext = spy(ethProtocolManager.ethContext()); - final EthScheduler ethScheduler = mock(EthScheduler.class); + final EthScheduler ethScheduler = spy(ethContext.getScheduler()); syncTaskCapture = ArgumentCaptor.forClass(Runnable.class); doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture()); - doAnswer(invocation -> ((Supplier) invocation.getArguments()[0]).get()) - .when(ethScheduler) - .scheduleServiceTask(any(Supplier.class)); doReturn(ethScheduler).when(ethContext).getScheduler(); peerTransactionTracker = new PeerTransactionTracker(); diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/ReplaceableDoubleSupplier.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/ReplaceableDoubleSupplier.java new file mode 100644 index 00000000000..5c424180ad9 --- /dev/null +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/ReplaceableDoubleSupplier.java @@ -0,0 +1,55 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.metrics; + +import java.util.function.DoubleSupplier; + +/** + * This class provides a replaceable double supplier. It allows to replace the current double + * supplier with a new one. + */ +public class ReplaceableDoubleSupplier implements DoubleSupplier { + private DoubleSupplier currentSupplier; + + /** + * Constructs a new ReplaceableDoubleSupplier with the given initial supplier. + * + * @param currentSupplier the initial double supplier + */ + public ReplaceableDoubleSupplier(final DoubleSupplier currentSupplier) { + this.currentSupplier = currentSupplier; + } + + /** + * Gets a double value from the current supplier. + * + * @return the double value supplied by the current supplier + */ + @Override + public double getAsDouble() { + return currentSupplier.getAsDouble(); + } + + /** + * Replaces the current double supplier with a new one. + * + * @param newSupplier the new double supplier + * @return this ReplaceableDoubleSupplier + */ + public ReplaceableDoubleSupplier replaceDoubleSupplier(final DoubleSupplier newSupplier) { + currentSupplier = newSupplier; + return this; + } +} diff --git a/metrics/core/src/test/java/org/hyperledger/besu/metrics/ReplaceableDoubleSupplierTest.java b/metrics/core/src/test/java/org/hyperledger/besu/metrics/ReplaceableDoubleSupplierTest.java new file mode 100644 index 00000000000..a83c32e163f --- /dev/null +++ b/metrics/core/src/test/java/org/hyperledger/besu/metrics/ReplaceableDoubleSupplierTest.java @@ -0,0 +1,36 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.metrics; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +public class ReplaceableDoubleSupplierTest { + + @Test + public void shouldWorkAsNormalSupplier() { + final var rds = new ReplaceableDoubleSupplier(() -> 1.0); + assertThat(rds.getAsDouble()).isEqualTo(1.0); + } + + @Test + public void shouldReturnValueFromNewSupplierIfReplaced() { + final var rds = new ReplaceableDoubleSupplier(() -> 1.0); + assertThat(rds.getAsDouble()).isEqualTo(1.0); + rds.replaceDoubleSupplier(() -> 2.0); + assertThat(rds.getAsDouble()).isEqualTo(2.0); + } +}