From 29a2699dae16ec21c842891f79434d63049d2e0f Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Date: Fri, 31 Jan 2025 16:23:53 -0600 Subject: [PATCH] feat: CN2BN Protocol Communication Part 2 (#558) Signed-off-by: Alfredo Gutierrez --- .../block/server/ack/AckHandlerImpl.java | 27 ++- .../server/ack/AckHandlerInjectionModule.java | 8 +- .../block/server/notifier/NotifierImpl.java | 9 +- .../producer/ProducerBlockItemObserver.java | 187 ++++++++++++++++-- .../block/server/service/ServiceStatus.java | 30 +++ .../server/service/ServiceStatusImpl.java | 24 ++- server/src/main/java/module-info.java | 1 + .../server/manager/AckHandlerImplTest.java | 49 +++-- .../AckHandlerInjectionModuleTest.java | 8 +- .../PbjBlockStreamServiceIntegrationTest.java | 6 +- .../ProducerBlockItemObserverTest.java | 76 +++++++ .../PositiveDataPersistenceTests.java | 2 + 12 files changed, 374 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/com/hedera/block/server/ack/AckHandlerImpl.java b/server/src/main/java/com/hedera/block/server/ack/AckHandlerImpl.java index eada4c24e..5779ec416 100644 --- a/server/src/main/java/com/hedera/block/server/ack/AckHandlerImpl.java +++ b/server/src/main/java/com/hedera/block/server/ack/AckHandlerImpl.java @@ -3,9 +3,12 @@ import com.hedera.block.server.block.BlockInfo; import com.hedera.block.server.notifier.Notifier; +import com.hedera.block.server.persistence.storage.remove.BlockRemover; +import com.hedera.block.server.service.ServiceStatus; import com.hedera.hapi.block.PublishStreamResponseCode; import com.hedera.pbj.runtime.io.buffer.Bytes; import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.inject.Inject; @@ -20,19 +23,29 @@ */ public class AckHandlerImpl implements AckHandler { + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + private final Map blockInfo = new ConcurrentHashMap<>(); private volatile long lastAcknowledgedBlockNumber = -1; private final Notifier notifier; private final boolean skipAcknowledgement; + private final ServiceStatus serviceStatus; + private final BlockRemover blockRemover; /** * Constructor. If either skipPersistence or skipVerification is true, * we ignore all events (no ACKs ever sent). */ @Inject - public AckHandlerImpl(@NonNull Notifier notifier, boolean skipAcknowledgement) { + public AckHandlerImpl( + @NonNull final Notifier notifier, + boolean skipAcknowledgement, + @NonNull final ServiceStatus serviceStatus, + @NonNull final BlockRemover blockRemover) { this.notifier = notifier; this.skipAcknowledgement = skipAcknowledgement; + this.serviceStatus = serviceStatus; + this.blockRemover = blockRemover; } /** @@ -76,8 +89,13 @@ public void blockVerified(long blockNumber, @NonNull Bytes blockHash) { */ @Override public void blockVerificationFailed(long blockNumber) { - notifier.sendEndOfStream(blockNumber, PublishStreamResponseCode.STREAM_ITEMS_BAD_STATE_PROOF); - // TODO We need to notify persistence to delete this block_number. + notifier.sendEndOfStream(lastAcknowledgedBlockNumber, PublishStreamResponseCode.STREAM_ITEMS_BAD_STATE_PROOF); + try { + blockRemover.remove(blockNumber); + } catch (IOException e) { + LOGGER.log(System.Logger.Level.ERROR, "Failed to remove block " + blockNumber, e); + throw new RuntimeException(e); + } } /** @@ -117,6 +135,9 @@ private void attemptAcks() { // Update last acknowledged lastAcknowledgedBlockNumber = nextBlock; + // Update the service status + serviceStatus.setLatestAckedBlock(info); + // Remove from map if desired (so we don't waste memory) blockInfo.remove(nextBlock); } diff --git a/server/src/main/java/com/hedera/block/server/ack/AckHandlerInjectionModule.java b/server/src/main/java/com/hedera/block/server/ack/AckHandlerInjectionModule.java index f84d7c8aa..ee63dc1ad 100644 --- a/server/src/main/java/com/hedera/block/server/ack/AckHandlerInjectionModule.java +++ b/server/src/main/java/com/hedera/block/server/ack/AckHandlerInjectionModule.java @@ -3,6 +3,8 @@ import com.hedera.block.server.notifier.Notifier; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; +import com.hedera.block.server.persistence.storage.remove.BlockRemover; +import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.verification.VerificationConfig; import dagger.Module; import dagger.Provides; @@ -25,11 +27,13 @@ public interface AckHandlerInjectionModule { static AckHandler provideBlockManager( @NonNull final Notifier notifier, @NonNull final PersistenceStorageConfig persistenceStorageConfig, - @NonNull final VerificationConfig verificationConfig) { + @NonNull final VerificationConfig verificationConfig, + @NonNull final ServiceStatus serviceStatus, + @NonNull final BlockRemover blockRemover) { boolean skipPersistence = persistenceStorageConfig.type().equals(PersistenceStorageConfig.StorageType.NO_OP); boolean skipVerification = verificationConfig.type().equals(VerificationConfig.VerificationServiceType.NO_OP); - return new AckHandlerImpl(notifier, skipPersistence | skipVerification); + return new AckHandlerImpl(notifier, skipPersistence | skipVerification, serviceStatus, blockRemover); } } diff --git a/server/src/main/java/com/hedera/block/server/notifier/NotifierImpl.java b/server/src/main/java/com/hedera/block/server/notifier/NotifierImpl.java index 997316734..2edcfe050 100644 --- a/server/src/main/java/com/hedera/block/server/notifier/NotifierImpl.java +++ b/server/src/main/java/com/hedera/block/server/notifier/NotifierImpl.java @@ -107,10 +107,13 @@ public void publish(@NonNull PublishStreamResponse response) { * @return the error stream response */ @NonNull - static PublishStreamResponse buildErrorStreamResponse() { - // TODO: Replace this with a real error enum. + private PublishStreamResponse buildErrorStreamResponse() { + long blockNumber = serviceStatus.getLatestAckedBlock() != null + ? serviceStatus.getLatestAckedBlock().getBlockNumber() + : serviceStatus.getLatestReceivedBlockNumber(); final EndOfStream endOfStream = EndOfStream.newBuilder() - .status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) + .status(PublishStreamResponseCode.STREAM_ITEMS_INTERNAL_ERROR) + .blockNumber(blockNumber) .build(); return PublishStreamResponse.newBuilder().status(endOfStream).build(); } diff --git a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java index 45f98e00c..80ba77ffd 100644 --- a/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java +++ b/server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java @@ -6,7 +6,9 @@ import static java.lang.System.Logger; import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.ERROR; +import static java.lang.System.Logger.Level.WARNING; +import com.hedera.block.server.block.BlockInfo; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.consumer.ConsumerConfig; import com.hedera.block.server.events.BlockNodeEventHandler; @@ -16,14 +18,20 @@ import com.hedera.block.server.mediator.SubscriptionHandler; import com.hedera.block.server.metrics.MetricsService; import com.hedera.block.server.service.ServiceStatus; +import com.hedera.hapi.block.Acknowledgement; +import com.hedera.hapi.block.BlockAcknowledgement; import com.hedera.hapi.block.BlockItemUnparsed; import com.hedera.hapi.block.EndOfStream; import com.hedera.hapi.block.PublishStreamResponse; import com.hedera.hapi.block.PublishStreamResponseCode; +import com.hedera.hapi.block.stream.output.BlockHeader; +import com.hedera.pbj.runtime.ParseException; import com.hedera.pbj.runtime.grpc.Pipeline; +import com.hedera.pbj.runtime.io.buffer.Bytes; import edu.umd.cs.findbugs.annotations.NonNull; import java.time.InstantSource; import java.util.List; +import java.util.Optional; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,6 +56,8 @@ public class ProducerBlockItemObserver private final LivenessCalculator livenessCalculator; + private boolean allowCurrentBlockStream = false; + /** * Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the * mediator with blocks as they arrive from the upstream producer. It also sends responses back @@ -100,29 +110,38 @@ public void onSubscribe(Flow.Subscription subscription) { @Override public void onNext(@NonNull final List blockItems) { - LOGGER.log(DEBUG, "Received PublishStreamRequest from producer with " + blockItems.size() + " BlockItems."); - if (blockItems.isEmpty()) { - return; - } - - metricsService.get(LiveBlockItemsReceived).add(blockItems.size()); + try { + LOGGER.log(DEBUG, "Received PublishStreamRequest from producer with " + blockItems.size() + " BlockItems."); + if (blockItems.isEmpty()) { + return; + } - // Publish the block to all the subscribers unless - // there's an issue with the StreamMediator. - if (serviceStatus.isRunning()) { - // Refresh the producer liveness - livenessCalculator.refresh(); + metricsService.get(LiveBlockItemsReceived).add(blockItems.size()); - // Publish the block to the mediator - publisher.publish(blockItems); + // Publish the block to all the subscribers unless + // there's an issue with the StreamMediator. + if (serviceStatus.isRunning()) { + // Refresh the producer liveness + livenessCalculator.refresh(); - } else { - LOGGER.log(ERROR, getClass().getName() + " is not accepting BlockItems"); - stopProcessing(); + // pre-check for valid block + if (preCheck(blockItems)) { + // Publish the block to the mediator + publisher.publish(blockItems); + } + } else { + LOGGER.log(ERROR, getClass().getName() + " is not accepting BlockItems"); + stopProcessing(); - // Close the upstream connection to the producer(s) + // Close the upstream connection to the producer(s) + publishStreamResponseObserver.onNext(buildErrorStreamResponse()); + LOGGER.log(ERROR, "Error PublishStreamResponse sent to upstream producer"); + } + } catch (Exception e) { + LOGGER.log(ERROR, "Error processing block items", e); publishStreamResponseObserver.onNext(buildErrorStreamResponse()); - LOGGER.log(ERROR, "Error PublishStreamResponse sent to upstream producer"); + // should we halt processing? + stopProcessing(); } } @@ -142,10 +161,13 @@ public void onEvent(ObjectEvent event, long sequence, boo } @NonNull - private static PublishStreamResponse buildErrorStreamResponse() { - // TODO: Replace this with a real error enum. + private PublishStreamResponse buildErrorStreamResponse() { + long blockNumber = serviceStatus.getLatestAckedBlock() != null + ? serviceStatus.getLatestAckedBlock().getBlockNumber() + : serviceStatus.getLatestReceivedBlockNumber(); final EndOfStream endOfStream = EndOfStream.newBuilder() - .status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) + .blockNumber(blockNumber) + .status(PublishStreamResponseCode.STREAM_ITEMS_INTERNAL_ERROR) .build(); return PublishStreamResponse.newBuilder().status(endOfStream).build(); } @@ -188,4 +210,127 @@ private void stopProcessing() { isResponsePermitted.set(false); subscriptionHandler.unsubscribe(this); } + + /** + * Pre-check for valid block, if the block is a duplicate or future block, we don't stream to the Ring Buffer. + * @param blockItems the list of block items + * @return true if the block should stream forward to RB otherwise false + */ + private boolean preCheck(@NonNull final List blockItems) { + + // we only check if is the start of a new block. + BlockItemUnparsed firstItem = blockItems.getFirst(); + if (!firstItem.hasBlockHeader()) { + return allowCurrentBlockStream; + } + + final long nextBlockNumber = attemptParseBlockHeaderNumber(firstItem); + final long nextExpectedBlockNumber = serviceStatus.getLatestReceivedBlockNumber() + 1; + + // temporary workaround so it always allows the first block at startup + if (nextExpectedBlockNumber == 1) { + allowCurrentBlockStream = true; + return true; + } + + // duplicate block + if (nextBlockNumber < nextExpectedBlockNumber) { + // we don't stream to the RB until we check a new blockHeader for the expected block + allowCurrentBlockStream = false; + LOGGER.log( + WARNING, + "Received a duplicate block, received_block_number: {0}, expected_block_number: {1}", + nextBlockNumber, + nextExpectedBlockNumber); + + notifyOfDuplicateBlock(nextBlockNumber); + } + + // future non-immediate block + if (nextBlockNumber > nextExpectedBlockNumber) { + // we don't stream to the RB until we check a new blockHeader for the expected block + allowCurrentBlockStream = false; + LOGGER.log( + WARNING, + "Received a future block, received_block_number: {0}, expected_block_number: {1}", + nextBlockNumber, + nextExpectedBlockNumber); + + notifyOfFutureBlock(serviceStatus.getLatestAckedBlock().getBlockNumber()); + } + + // if block number is neither duplicate nor future (should be the same as expected) + // we allow the stream of subsequent batches + allowCurrentBlockStream = true; + // we also allow the batch that contains the block_header + return true; + } + + /** + * Parse the block header number from the given block item. + * If the block header is not parsable, log the error and throw a runtime exception. + * This is necessary to wrap the checked exception that is thrown by the parse method. + * */ + private long attemptParseBlockHeaderNumber(@NonNull final BlockItemUnparsed blockItem) { + try { + return BlockHeader.PROTOBUF.parse(blockItem.blockHeader()).number(); + } catch (ParseException e) { + LOGGER.log(ERROR, "Error parsing block header", e); + throw new RuntimeException(e); + } + } + + /** + * Get the block hash for the given block number, only if is the latest acked block. + * otherwise Empty + * + * @param blockNumber the block number + * @return a promise of block hash if it exists + */ + private Optional getBlockHash(final long blockNumber) { + final BlockInfo latestAckedBlockNumber = serviceStatus.getLatestAckedBlock(); + if (latestAckedBlockNumber.getBlockNumber() == blockNumber) { + return Optional.of(latestAckedBlockNumber.getBlockHash()); + } + // if the block is older than the latest acked block, we don't have the hash on hand + return Optional.empty(); + } + + /** + * Notify the producer of a future block that was not expected was received. + * + * @param currentBlock the current block number that is persisted and verified. + */ + private void notifyOfFutureBlock(final long currentBlock) { + final EndOfStream endOfStream = EndOfStream.newBuilder() + .status(PublishStreamResponseCode.STREAM_ITEMS_BEHIND) + .blockNumber(currentBlock) + .build(); + + final PublishStreamResponse publishStreamResponse = + PublishStreamResponse.newBuilder().status(endOfStream).build(); + + publishStreamResponseObserver.onNext(publishStreamResponse); + } + + /** + * Notify the producer of a duplicate block that was received. + * + * @param duplicateBlockNumber the block number that was received and is a duplicate + */ + private void notifyOfDuplicateBlock(final long duplicateBlockNumber) { + final BlockAcknowledgement blockAcknowledgement = BlockAcknowledgement.newBuilder() + .blockAlreadyExists(true) + .blockNumber(duplicateBlockNumber) + .blockRootHash(getBlockHash(duplicateBlockNumber).orElse(Bytes.EMPTY)) + .build(); + + final PublishStreamResponse publishStreamResponse = PublishStreamResponse.newBuilder() + .acknowledgement(Acknowledgement.newBuilder() + .blockAck(blockAcknowledgement) + .build()) + .build(); + + publishStreamResponseObserver.onNext(publishStreamResponse); + } } diff --git a/server/src/main/java/com/hedera/block/server/service/ServiceStatus.java b/server/src/main/java/com/hedera/block/server/service/ServiceStatus.java index 464eeffe2..d49426b84 100644 --- a/server/src/main/java/com/hedera/block/server/service/ServiceStatus.java +++ b/server/src/main/java/com/hedera/block/server/service/ServiceStatus.java @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 package com.hedera.block.server.service; +import com.hedera.block.server.block.BlockInfo; import edu.umd.cs.findbugs.annotations.NonNull; import io.helidon.webserver.WebServer; @@ -38,4 +39,33 @@ public interface ServiceStatus { * @param className the name of the class stopping the service */ void stopWebServer(final String className); + + /** + * Gets the latest acked block number. + * + * @return the latest acked block number + */ + BlockInfo getLatestAckedBlock(); + + /** + * Sets the latest acked block number. + * + * @param latestAckedBlockInfo the latest acked block number + */ + void setLatestAckedBlock(BlockInfo latestAckedBlockInfo); + + /** + * Gets the latest received block number, when ack is skipped it might be used instead of last acked block number. + * Also, if persistence + verification is in progress, it might be used to check if the block is already received. + * + * @return the latest received block number + */ + long getLatestReceivedBlockNumber(); + + /** + * Sets the latest received block number. should be set when a block_header is received and before the first batch is placed on the ring buffer. + * + * @param latestReceivedBlockNumber the latest received block number + */ + void setLatestReceivedBlockNumber(long latestReceivedBlockNumber); } diff --git a/server/src/main/java/com/hedera/block/server/service/ServiceStatusImpl.java b/server/src/main/java/com/hedera/block/server/service/ServiceStatusImpl.java index 499fee221..0cc7bc3ed 100644 --- a/server/src/main/java/com/hedera/block/server/service/ServiceStatusImpl.java +++ b/server/src/main/java/com/hedera/block/server/service/ServiceStatusImpl.java @@ -4,6 +4,7 @@ import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.ERROR; +import com.hedera.block.server.block.BlockInfo; import com.hedera.block.server.config.BlockNodeContext; import edu.umd.cs.findbugs.annotations.NonNull; import io.helidon.webserver.WebServer; @@ -22,7 +23,8 @@ public class ServiceStatusImpl implements ServiceStatus { private final AtomicBoolean isRunning = new AtomicBoolean(true); private WebServer webServer; - + private volatile BlockInfo latestAckedBlock; + private volatile long latestReceivedBlockNumber; private final int delayMillis; /** @@ -92,4 +94,24 @@ public void stopWebServer(@NonNull final String className) { // Stop the web server webServer.stop(); } + + @Override + public BlockInfo getLatestAckedBlock() { + return latestAckedBlock; + } + + @Override + public void setLatestAckedBlock(BlockInfo latestAckedBlock) { + this.latestAckedBlock = latestAckedBlock; + } + + @Override + public long getLatestReceivedBlockNumber() { + return latestReceivedBlockNumber; + } + + @Override + public void setLatestReceivedBlockNumber(long latestReceivedBlockNumber) { + this.latestReceivedBlockNumber = latestReceivedBlockNumber; + } } diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 55d1fee65..0ae016b17 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -29,6 +29,7 @@ exports com.hedera.block.server.verification.session; exports com.hedera.block.server.verification.signature; exports com.hedera.block.server.verification.service; + exports com.hedera.block.server.block; requires com.hedera.block.common; requires com.hedera.block.stream; diff --git a/server/src/test/java/com/hedera/block/server/manager/AckHandlerImplTest.java b/server/src/test/java/com/hedera/block/server/manager/AckHandlerImplTest.java index fe24d5630..cde260c39 100644 --- a/server/src/test/java/com/hedera/block/server/manager/AckHandlerImplTest.java +++ b/server/src/test/java/com/hedera/block/server/manager/AckHandlerImplTest.java @@ -11,6 +11,8 @@ import com.hedera.block.server.ack.AckHandlerImpl; import com.hedera.block.server.notifier.Notifier; +import com.hedera.block.server.persistence.storage.remove.BlockRemover; +import com.hedera.block.server.service.ServiceStatus; import com.hedera.hapi.block.PublishStreamResponseCode; import com.hedera.pbj.runtime.io.buffer.Bytes; import java.util.List; @@ -18,24 +20,33 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mock; class AckHandlerImplTest { private Notifier notifier; - private AckHandlerImpl blockManager; + private AckHandlerImpl ackHandler; + + @Mock + private ServiceStatus serviceStatus; + + @Mock + private BlockRemover blockRemover; @BeforeEach void setUp() { notifier = mock(Notifier.class); + blockRemover = mock(BlockRemover.class); + serviceStatus = mock(ServiceStatus.class); // By default, we do NOT skip acknowledgements - blockManager = new AckHandlerImpl(notifier, false); + ackHandler = new AckHandlerImpl(notifier, false, serviceStatus, blockRemover); } @Test @DisplayName("blockVerified + blockPersisted should do nothing if skipAcknowledgement == true") void blockVerified_skippedAcknowledgement() { // given - AckHandlerImpl managerWithSkip = new AckHandlerImpl(notifier, true); + AckHandlerImpl managerWithSkip = new AckHandlerImpl(notifier, true, serviceStatus, blockRemover); // when managerWithSkip.blockVerified(1L, Bytes.wrap("somehash".getBytes())); @@ -50,10 +61,10 @@ void blockVerified_skippedAcknowledgement() { @DisplayName("blockVerificationFailed should send end-of-stream message with appropriate code") void blockVerificationFailed_sendsEndOfStream() { // when - blockManager.blockVerificationFailed(5L); + ackHandler.blockVerificationFailed(2L); // then - verify(notifier, times(1)).sendEndOfStream(5L, PublishStreamResponseCode.STREAM_ITEMS_BAD_STATE_PROOF); + verify(notifier, times(1)).sendEndOfStream(-1L, PublishStreamResponseCode.STREAM_ITEMS_BAD_STATE_PROOF); verifyNoMoreInteractions(notifier); } @@ -61,7 +72,7 @@ void blockVerificationFailed_sendsEndOfStream() { @DisplayName("blockPersisted alone does not ACK") void blockPersisted_thenNoAckWithoutVerification() { // when - blockManager.blockPersisted(1L); + ackHandler.blockPersisted(1L); // then // We have not verified the block, so no ACK is sent @@ -72,7 +83,7 @@ void blockPersisted_thenNoAckWithoutVerification() { @DisplayName("blockVerified alone does not ACK") void blockVerified_thenNoAckWithoutPersistence() { // when - blockManager.blockVerified(1L, Bytes.wrap("hash1".getBytes())); + ackHandler.blockVerified(1L, Bytes.wrap("hash1".getBytes())); // then verifyNoInteractions(notifier); @@ -86,8 +97,8 @@ void blockPersistedThenBlockVerified_triggersAck() { Bytes blockHash = Bytes.wrap("hash1".getBytes()); // when - blockManager.blockPersisted(blockNumber); - blockManager.blockVerified(blockNumber, blockHash); + ackHandler.blockPersisted(blockNumber); + ackHandler.blockVerified(blockNumber, blockHash); // then // We expect a single ACK for block #1 @@ -109,16 +120,16 @@ void multipleBlocksAckInSequence() { // when // Mark block1 persisted and verified - blockManager.blockPersisted(block1); - blockManager.blockVerified(block1, hash1); + ackHandler.blockPersisted(block1); + ackHandler.blockVerified(block1, hash1); // Mark block2 persisted and verified - blockManager.blockPersisted(block2); - blockManager.blockVerified(block2, hash2); + ackHandler.blockPersisted(block2); + ackHandler.blockVerified(block2, hash2); // Mark block3 persisted and verified - blockManager.blockPersisted(block3); - blockManager.blockVerified(block3, hash3); + ackHandler.blockPersisted(block3); + ackHandler.blockVerified(block3, hash3); // then // The manager should ACK blocks in ascending order (1,2,3). @@ -154,11 +165,11 @@ void ackStopsIfNextBlockIsNotReady() { // when // Fully persist & verify block #10 -> Should ACK - blockManager.blockPersisted(block1); - blockManager.blockVerified(block1, hash1); + ackHandler.blockPersisted(block1); + ackHandler.blockVerified(block1, hash1); // Partially persist block #11 - blockManager.blockPersisted(block2); + ackHandler.blockPersisted(block2); // We do NOT verify block #11 yet // then @@ -167,7 +178,7 @@ void ackStopsIfNextBlockIsNotReady() { verifyNoMoreInteractions(notifier); // Now verify block #11 - blockManager.blockVerified(block2, hash2); + ackHandler.blockVerified(block2, hash2); // Expect the second ACK verify(notifier, times(1)).sendAck(eq(block2), eq(hash2), eq(false)); diff --git a/server/src/test/java/com/hedera/block/server/manager/AckHandlerInjectionModuleTest.java b/server/src/test/java/com/hedera/block/server/manager/AckHandlerInjectionModuleTest.java index faa0f2ca4..091815e24 100644 --- a/server/src/test/java/com/hedera/block/server/manager/AckHandlerInjectionModuleTest.java +++ b/server/src/test/java/com/hedera/block/server/manager/AckHandlerInjectionModuleTest.java @@ -10,6 +10,8 @@ import com.hedera.block.server.ack.AckHandlerInjectionModule; import com.hedera.block.server.notifier.Notifier; import com.hedera.block.server.persistence.storage.PersistenceStorageConfig; +import com.hedera.block.server.persistence.storage.remove.BlockRemover; +import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.verification.VerificationConfig; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -22,6 +24,8 @@ class AckHandlerInjectionModuleTest { void testProvideBlockManager() { // given Notifier notifier = mock(Notifier.class); + ServiceStatus serviceStatus = mock(ServiceStatus.class); + BlockRemover blockRemover = mock(BlockRemover.class); PersistenceStorageConfig persistenceStorageConfig = new PersistenceStorageConfig( "", "", @@ -34,8 +38,8 @@ void testProvideBlockManager() { when(verificationConfig.type()).thenReturn(VerificationConfig.VerificationServiceType.PRODUCTION); // when - AckHandler ackHandler = - AckHandlerInjectionModule.provideBlockManager(notifier, persistenceStorageConfig, verificationConfig); + AckHandler ackHandler = AckHandlerInjectionModule.provideBlockManager( + notifier, persistenceStorageConfig, verificationConfig, serviceStatus, blockRemover); // then // AckHandlerImpl is the default and only implementation diff --git a/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java b/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java index eeb6443c5..6bda41da2 100644 --- a/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java +++ b/server/src/test/java/com/hedera/block/server/pbj/PbjBlockStreamServiceIntegrationTest.java @@ -678,7 +678,8 @@ private static Bytes buildSubscribeStreamResponse(BlockItemUnparsed blockItem) { private static Bytes buildEndOfStreamResponse() { final EndOfStream endOfStream = EndOfStream.newBuilder() - .status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN) + .status(PublishStreamResponseCode.STREAM_ITEMS_INTERNAL_ERROR) + .blockNumber(0L) .build(); return PublishStreamResponse.PROTOBUF.toBytes( PublishStreamResponse.newBuilder().status(endOfStream).build()); @@ -696,10 +697,11 @@ private BlockVerificationSessionFactory getBlockVerificationSessionFactory() { private PbjBlockStreamServiceProxy buildBlockStreamService( final BlockWriter, Long> blockWriter) { + final BlockRemover blockRemover = mock(BlockRemover.class); final ServiceStatus serviceStatus = new ServiceStatusImpl(blockNodeContext); final var streamMediator = buildStreamMediator(new ConcurrentHashMap<>(32), serviceStatus); final var notifier = new NotifierImpl(streamMediator, blockNodeContext, serviceStatus); - final var blockManager = new AckHandlerImpl(notifier, false); + final var blockManager = new AckHandlerImpl(notifier, false, serviceStatus, blockRemover); final var blockVerificationSessionFactory = getBlockVerificationSessionFactory(); final var BlockVerificationService = new BlockVerificationServiceImpl( diff --git a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java index b62bc83bb..90d6d592b 100644 --- a/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java +++ b/server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java @@ -2,12 +2,14 @@ package com.hedera.block.server.producer; import static com.hedera.block.server.util.PersistTestUtils.generateBlockItemsUnparsed; +import static com.hedera.block.server.util.PersistTestUtils.generateBlockItemsUnparsedForWithBlockNumber; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.hedera.block.server.block.BlockInfo; import com.hedera.block.server.config.BlockNodeContext; import com.hedera.block.server.events.ObjectEvent; import com.hedera.block.server.mediator.Publisher; @@ -15,14 +17,20 @@ import com.hedera.block.server.service.ServiceStatus; import com.hedera.block.server.service.ServiceStatusImpl; import com.hedera.block.server.util.TestConfigUtil; +import com.hedera.hapi.block.Acknowledgement; +import com.hedera.hapi.block.BlockAcknowledgement; import com.hedera.hapi.block.BlockItemUnparsed; +import com.hedera.hapi.block.EndOfStream; import com.hedera.hapi.block.PublishStreamResponse; +import com.hedera.hapi.block.PublishStreamResponseCode; import com.hedera.pbj.runtime.grpc.Pipeline; +import com.hedera.pbj.runtime.io.buffer.Bytes; import java.io.IOException; import java.time.InstantSource; import java.util.List; import java.util.Map; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -147,4 +155,72 @@ public void testClientEndStreamReceived() { // Confirm that the observer was unsubscribed verify(subscriptionHandler, timeout(testTimeout).times(1)).unsubscribe(producerBlockItemObserver); } + + @Test + @DisplayName("Test duplicate block items") + public void testDuplicateBlockReceived() { + + // given + when(serviceStatus.isRunning()).thenReturn(true); + long latestAckedBlockNumber = 10L; + Bytes fakeHash = Bytes.wrap("fake_hash"); + BlockInfo latestAckedBlock = new BlockInfo(latestAckedBlockNumber); + latestAckedBlock.setBlockHash(fakeHash); + when(serviceStatus.getLatestAckedBlock()).thenReturn(latestAckedBlock); + when(serviceStatus.getLatestReceivedBlockNumber()).thenReturn(latestAckedBlockNumber); + + final List blockItems = generateBlockItemsUnparsedForWithBlockNumber(10); + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus); + + // when + producerBlockItemObserver.onNext(blockItems); + + // then + final BlockAcknowledgement blockAcknowledgement = BlockAcknowledgement.newBuilder() + .blockNumber(10L) + .blockAlreadyExists(true) + .blockRootHash(fakeHash) + .build(); + + final Acknowledgement acknowledgement = + Acknowledgement.newBuilder().blockAck(blockAcknowledgement).build(); + + final PublishStreamResponse publishStreamResponse = PublishStreamResponse.newBuilder() + .acknowledgement(acknowledgement) + .build(); + + // verify helidonPublishPipeline.onNext() is called once with publishStreamResponse + verify(helidonPublishPipeline, timeout(testTimeout).times(1)).onNext(publishStreamResponse); + } + + @Test + @DisplayName("Test future (ahead of expected) block received") + public void testFutureBlockReceived() { + // given + when(serviceStatus.isRunning()).thenReturn(true); + long latestAckedBlockNumber = 10L; + Bytes fakeHash = Bytes.wrap("fake_hash"); + BlockInfo latestAckedBlock = new BlockInfo(latestAckedBlockNumber); + latestAckedBlock.setBlockHash(fakeHash); + when(serviceStatus.getLatestAckedBlock()).thenReturn(latestAckedBlock); + when(serviceStatus.getLatestReceivedBlockNumber()).thenReturn(10L); + final List blockItems = generateBlockItemsUnparsedForWithBlockNumber(12); + final ProducerBlockItemObserver producerBlockItemObserver = new ProducerBlockItemObserver( + testClock, publisher, subscriptionHandler, helidonPublishPipeline, testContext, serviceStatus); + + // when + producerBlockItemObserver.onNext(blockItems); + + // then + final EndOfStream endOfStream = EndOfStream.newBuilder() + .status(PublishStreamResponseCode.STREAM_ITEMS_BEHIND) + .blockNumber(10L) + .build(); + final PublishStreamResponse publishStreamResponse = + PublishStreamResponse.newBuilder().status(endOfStream).build(); + + // verify helidonPublishPipeline.onNext() is called once with publishStreamResponse + verify(helidonPublishPipeline, timeout(testTimeout).times(1)).onNext(publishStreamResponse); + } } diff --git a/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java b/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java index c5f5b44d4..7a9ae6da8 100644 --- a/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java +++ b/suites/src/main/java/com/hedera/block/suites/persistence/positive/PositiveDataPersistenceTests.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.util.concurrent.Future; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; @@ -46,6 +47,7 @@ void teardownEnvironment() { * commands */ @Test + @Disabled("Needs simulator to be updated with blocks that pass verification @todo(502) @todo(175)") public void verifyBlockDataSavedInCorrectDirectory() throws InterruptedException, IOException { String savedBlocksFolderBefore = getContainerCommandResult(GET_BLOCKS_COMMAND); int savedBlocksCountBefore = getSavedBlocksCount(savedBlocksFolderBefore);