Skip to content

Commit

Permalink
feat: CN2BN Protocol Communication Part 2 (#558)
Browse files Browse the repository at this point in the history
Signed-off-by: Alfredo Gutierrez <[email protected]>
  • Loading branch information
AlfredoG87 authored Jan 31, 2025
1 parent 60ffd34 commit 29a2699
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

import com.hedera.block.server.block.BlockInfo;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.remove.BlockRemover;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.PublishStreamResponseCode;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;
Expand All @@ -20,19 +23,29 @@
*/
public class AckHandlerImpl implements AckHandler {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final Map<Long, BlockInfo> blockInfo = new ConcurrentHashMap<>();
private volatile long lastAcknowledgedBlockNumber = -1;
private final Notifier notifier;
private final boolean skipAcknowledgement;
private final ServiceStatus serviceStatus;
private final BlockRemover blockRemover;

/**
* Constructor. If either skipPersistence or skipVerification is true,
* we ignore all events (no ACKs ever sent).
*/
@Inject
public AckHandlerImpl(@NonNull Notifier notifier, boolean skipAcknowledgement) {
public AckHandlerImpl(
@NonNull final Notifier notifier,
boolean skipAcknowledgement,
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockRemover blockRemover) {
this.notifier = notifier;
this.skipAcknowledgement = skipAcknowledgement;
this.serviceStatus = serviceStatus;
this.blockRemover = blockRemover;
}

/**
Expand Down Expand Up @@ -76,8 +89,13 @@ public void blockVerified(long blockNumber, @NonNull Bytes blockHash) {
*/
@Override
public void blockVerificationFailed(long blockNumber) {
notifier.sendEndOfStream(blockNumber, PublishStreamResponseCode.STREAM_ITEMS_BAD_STATE_PROOF);
// TODO We need to notify persistence to delete this block_number.
notifier.sendEndOfStream(lastAcknowledgedBlockNumber, PublishStreamResponseCode.STREAM_ITEMS_BAD_STATE_PROOF);
try {
blockRemover.remove(blockNumber);
} catch (IOException e) {
LOGGER.log(System.Logger.Level.ERROR, "Failed to remove block " + blockNumber, e);
throw new RuntimeException(e);
}
}

/**
Expand Down Expand Up @@ -117,6 +135,9 @@ private void attemptAcks() {
// Update last acknowledged
lastAcknowledgedBlockNumber = nextBlock;

// Update the service status
serviceStatus.setLatestAckedBlock(info);

// Remove from map if desired (so we don't waste memory)
blockInfo.remove(nextBlock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.remove.BlockRemover;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.block.server.verification.VerificationConfig;
import dagger.Module;
import dagger.Provides;
Expand All @@ -25,11 +27,13 @@ public interface AckHandlerInjectionModule {
static AckHandler provideBlockManager(
@NonNull final Notifier notifier,
@NonNull final PersistenceStorageConfig persistenceStorageConfig,
@NonNull final VerificationConfig verificationConfig) {
@NonNull final VerificationConfig verificationConfig,
@NonNull final ServiceStatus serviceStatus,
@NonNull final BlockRemover blockRemover) {

boolean skipPersistence = persistenceStorageConfig.type().equals(PersistenceStorageConfig.StorageType.NO_OP);
boolean skipVerification = verificationConfig.type().equals(VerificationConfig.VerificationServiceType.NO_OP);

return new AckHandlerImpl(notifier, skipPersistence | skipVerification);
return new AckHandlerImpl(notifier, skipPersistence | skipVerification, serviceStatus, blockRemover);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,13 @@ public void publish(@NonNull PublishStreamResponse response) {
* @return the error stream response
*/
@NonNull
static PublishStreamResponse buildErrorStreamResponse() {
// TODO: Replace this with a real error enum.
private PublishStreamResponse buildErrorStreamResponse() {
long blockNumber = serviceStatus.getLatestAckedBlock() != null
? serviceStatus.getLatestAckedBlock().getBlockNumber()
: serviceStatus.getLatestReceivedBlockNumber();
final EndOfStream endOfStream = EndOfStream.newBuilder()
.status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN)
.status(PublishStreamResponseCode.STREAM_ITEMS_INTERNAL_ERROR)
.blockNumber(blockNumber)
.build();
return PublishStreamResponse.newBuilder().status(endOfStream).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import static java.lang.System.Logger;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.WARNING;

import com.hedera.block.server.block.BlockInfo;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.consumer.ConsumerConfig;
import com.hedera.block.server.events.BlockNodeEventHandler;
Expand All @@ -16,14 +18,20 @@
import com.hedera.block.server.mediator.SubscriptionHandler;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.Acknowledgement;
import com.hedera.hapi.block.BlockAcknowledgement;
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.EndOfStream;
import com.hedera.hapi.block.PublishStreamResponse;
import com.hedera.hapi.block.PublishStreamResponseCode;
import com.hedera.hapi.block.stream.output.BlockHeader;
import com.hedera.pbj.runtime.ParseException;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.InstantSource;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -48,6 +56,8 @@ public class ProducerBlockItemObserver

private final LivenessCalculator livenessCalculator;

private boolean allowCurrentBlockStream = false;

/**
* Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the
* mediator with blocks as they arrive from the upstream producer. It also sends responses back
Expand Down Expand Up @@ -100,29 +110,38 @@ public void onSubscribe(Flow.Subscription subscription) {
@Override
public void onNext(@NonNull final List<BlockItemUnparsed> blockItems) {

LOGGER.log(DEBUG, "Received PublishStreamRequest from producer with " + blockItems.size() + " BlockItems.");
if (blockItems.isEmpty()) {
return;
}

metricsService.get(LiveBlockItemsReceived).add(blockItems.size());
try {
LOGGER.log(DEBUG, "Received PublishStreamRequest from producer with " + blockItems.size() + " BlockItems.");
if (blockItems.isEmpty()) {
return;
}

// Publish the block to all the subscribers unless
// there's an issue with the StreamMediator.
if (serviceStatus.isRunning()) {
// Refresh the producer liveness
livenessCalculator.refresh();
metricsService.get(LiveBlockItemsReceived).add(blockItems.size());

// Publish the block to the mediator
publisher.publish(blockItems);
// Publish the block to all the subscribers unless
// there's an issue with the StreamMediator.
if (serviceStatus.isRunning()) {
// Refresh the producer liveness
livenessCalculator.refresh();

} else {
LOGGER.log(ERROR, getClass().getName() + " is not accepting BlockItems");
stopProcessing();
// pre-check for valid block
if (preCheck(blockItems)) {
// Publish the block to the mediator
publisher.publish(blockItems);
}
} else {
LOGGER.log(ERROR, getClass().getName() + " is not accepting BlockItems");
stopProcessing();

// Close the upstream connection to the producer(s)
// Close the upstream connection to the producer(s)
publishStreamResponseObserver.onNext(buildErrorStreamResponse());
LOGGER.log(ERROR, "Error PublishStreamResponse sent to upstream producer");
}
} catch (Exception e) {
LOGGER.log(ERROR, "Error processing block items", e);
publishStreamResponseObserver.onNext(buildErrorStreamResponse());
LOGGER.log(ERROR, "Error PublishStreamResponse sent to upstream producer");
// should we halt processing?
stopProcessing();
}
}

Expand All @@ -142,10 +161,13 @@ public void onEvent(ObjectEvent<PublishStreamResponse> event, long sequence, boo
}

@NonNull
private static PublishStreamResponse buildErrorStreamResponse() {
// TODO: Replace this with a real error enum.
private PublishStreamResponse buildErrorStreamResponse() {
long blockNumber = serviceStatus.getLatestAckedBlock() != null
? serviceStatus.getLatestAckedBlock().getBlockNumber()
: serviceStatus.getLatestReceivedBlockNumber();
final EndOfStream endOfStream = EndOfStream.newBuilder()
.status(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN)
.blockNumber(blockNumber)
.status(PublishStreamResponseCode.STREAM_ITEMS_INTERNAL_ERROR)
.build();
return PublishStreamResponse.newBuilder().status(endOfStream).build();
}
Expand Down Expand Up @@ -188,4 +210,127 @@ private void stopProcessing() {
isResponsePermitted.set(false);
subscriptionHandler.unsubscribe(this);
}

/**
* Pre-check for valid block, if the block is a duplicate or future block, we don't stream to the Ring Buffer.
* @param blockItems the list of block items
* @return true if the block should stream forward to RB otherwise false
*/
private boolean preCheck(@NonNull final List<BlockItemUnparsed> blockItems) {

// we only check if is the start of a new block.
BlockItemUnparsed firstItem = blockItems.getFirst();
if (!firstItem.hasBlockHeader()) {
return allowCurrentBlockStream;
}

final long nextBlockNumber = attemptParseBlockHeaderNumber(firstItem);
final long nextExpectedBlockNumber = serviceStatus.getLatestReceivedBlockNumber() + 1;

// temporary workaround so it always allows the first block at startup
if (nextExpectedBlockNumber == 1) {
allowCurrentBlockStream = true;
return true;
}

// duplicate block
if (nextBlockNumber < nextExpectedBlockNumber) {
// we don't stream to the RB until we check a new blockHeader for the expected block
allowCurrentBlockStream = false;
LOGGER.log(
WARNING,
"Received a duplicate block, received_block_number: {0}, expected_block_number: {1}",
nextBlockNumber,
nextExpectedBlockNumber);

notifyOfDuplicateBlock(nextBlockNumber);
}

// future non-immediate block
if (nextBlockNumber > nextExpectedBlockNumber) {
// we don't stream to the RB until we check a new blockHeader for the expected block
allowCurrentBlockStream = false;
LOGGER.log(
WARNING,
"Received a future block, received_block_number: {0}, expected_block_number: {1}",
nextBlockNumber,
nextExpectedBlockNumber);

notifyOfFutureBlock(serviceStatus.getLatestAckedBlock().getBlockNumber());
}

// if block number is neither duplicate nor future (should be the same as expected)
// we allow the stream of subsequent batches
allowCurrentBlockStream = true;
// we also allow the batch that contains the block_header
return true;
}

/**
* Parse the block header number from the given block item.
* If the block header is not parsable, log the error and throw a runtime exception.
* This is necessary to wrap the checked exception that is thrown by the parse method.
* */
private long attemptParseBlockHeaderNumber(@NonNull final BlockItemUnparsed blockItem) {
try {
return BlockHeader.PROTOBUF.parse(blockItem.blockHeader()).number();
} catch (ParseException e) {
LOGGER.log(ERROR, "Error parsing block header", e);
throw new RuntimeException(e);
}
}

/**
* Get the block hash for the given block number, only if is the latest acked block.
* otherwise Empty
*
* @param blockNumber the block number
* @return a promise of block hash if it exists
*/
private Optional<Bytes> getBlockHash(final long blockNumber) {
final BlockInfo latestAckedBlockNumber = serviceStatus.getLatestAckedBlock();
if (latestAckedBlockNumber.getBlockNumber() == blockNumber) {
return Optional.of(latestAckedBlockNumber.getBlockHash());
}
// if the block is older than the latest acked block, we don't have the hash on hand
return Optional.empty();
}

/**
* Notify the producer of a future block that was not expected was received.
*
* @param currentBlock the current block number that is persisted and verified.
*/
private void notifyOfFutureBlock(final long currentBlock) {
final EndOfStream endOfStream = EndOfStream.newBuilder()
.status(PublishStreamResponseCode.STREAM_ITEMS_BEHIND)
.blockNumber(currentBlock)
.build();

final PublishStreamResponse publishStreamResponse =
PublishStreamResponse.newBuilder().status(endOfStream).build();

publishStreamResponseObserver.onNext(publishStreamResponse);
}

/**
* Notify the producer of a duplicate block that was received.
*
* @param duplicateBlockNumber the block number that was received and is a duplicate
*/
private void notifyOfDuplicateBlock(final long duplicateBlockNumber) {
final BlockAcknowledgement blockAcknowledgement = BlockAcknowledgement.newBuilder()
.blockAlreadyExists(true)
.blockNumber(duplicateBlockNumber)
.blockRootHash(getBlockHash(duplicateBlockNumber).orElse(Bytes.EMPTY))
.build();

final PublishStreamResponse publishStreamResponse = PublishStreamResponse.newBuilder()
.acknowledgement(Acknowledgement.newBuilder()
.blockAck(blockAcknowledgement)
.build())
.build();

publishStreamResponseObserver.onNext(publishStreamResponse);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.service;

import com.hedera.block.server.block.BlockInfo;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.webserver.WebServer;

Expand Down Expand Up @@ -38,4 +39,33 @@ public interface ServiceStatus {
* @param className the name of the class stopping the service
*/
void stopWebServer(final String className);

/**
* Gets the latest acked block number.
*
* @return the latest acked block number
*/
BlockInfo getLatestAckedBlock();

/**
* Sets the latest acked block number.
*
* @param latestAckedBlockInfo the latest acked block number
*/
void setLatestAckedBlock(BlockInfo latestAckedBlockInfo);

/**
* Gets the latest received block number, when ack is skipped it might be used instead of last acked block number.
* Also, if persistence + verification is in progress, it might be used to check if the block is already received.
*
* @return the latest received block number
*/
long getLatestReceivedBlockNumber();

/**
* Sets the latest received block number. should be set when a block_header is received and before the first batch is placed on the ring buffer.
*
* @param latestReceivedBlockNumber the latest received block number
*/
void setLatestReceivedBlockNumber(long latestReceivedBlockNumber);
}
Loading

0 comments on commit 29a2699

Please sign in to comment.