diff --git a/simulator/build.gradle.kts b/simulator/build.gradle.kts index 5464afecf..2296e9c62 100644 --- a/simulator/build.gradle.kts +++ b/simulator/build.gradle.kts @@ -34,19 +34,32 @@ testModuleInfo { } // Task to run simulator in Publisher mode -tasks.register("runPublisher") { - description = "Run the simulator in Publisher mode" +tasks.register("runPublisherClient") { + description = "Run the simulator in Publisher Client mode" group = "application" mainClass = application.mainClass mainModule = application.mainModule classpath = sourceSets["main"].runtimeClasspath - environment("BLOCK_STREAM_SIMULATOR_MODE", "PUBLISHER") + environment("BLOCK_STREAM_SIMULATOR_MODE", "PUBLISHER_CLIENT") environment("PROMETHEUS_ENDPOINT_ENABLED", "true") environment("PROMETHEUS_ENDPOINT_PORT_NUMBER", "9998") } +tasks.register("runPublisherServer") { + description = "Run the simulator in Publisher Server mode" + group = "application" + + mainClass = application.mainClass + mainModule = application.mainModule + classpath = sourceSets["main"].runtimeClasspath + + environment("BLOCK_STREAM_SIMULATOR_MODE", "PUBLISHER_SERVER") + environment("PROMETHEUS_ENDPOINT_ENABLED", "true") + environment("PROMETHEUS_ENDPOINT_PORT_NUMBER", "9996") +} + // Task to run simulator in Consumer mode tasks.register("runConsumer") { description = "Run the simulator in Consumer mode" diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java index bf6f6a5e7..69de02651 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java @@ -6,17 +6,12 @@ import static java.lang.System.Logger.Level.INFO; import static java.util.Objects.requireNonNull; -import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.data.StreamStatus; -import com.hedera.block.simulator.config.types.SimulatorMode; import com.hedera.block.simulator.exception.BlockSimulatorParsingException; import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; -import com.hedera.block.simulator.metrics.MetricsService; -import com.hedera.block.simulator.mode.CombinedModeHandler; -import com.hedera.block.simulator.mode.ConsumerModeHandler; -import com.hedera.block.simulator.mode.PublisherModeHandler; +import com.hedera.block.simulator.grpc.PublishStreamGrpcServer; import com.hedera.block.simulator.mode.SimulatorModeHandler; import com.swirlds.config.api.Configuration; import edu.umd.cs.findbugs.annotations.NonNull; @@ -46,6 +41,7 @@ public class BlockStreamSimulatorApp { // Service dependencies private final PublishStreamGrpcClient publishStreamGrpcClient; + private final PublishStreamGrpcServer publishStreamGrpcServer; private final ConsumerStreamGrpcClient consumerStreamGrpcClient; private final SimulatorModeHandler simulatorModeHandler; @@ -62,7 +58,6 @@ public class BlockStreamSimulatorApp { * generation * @param publishStreamGrpcClient The gRPC client for publishing blocks * @param consumerStreamGrpcClient The gRPC client for consuming blocks - * @param metricsService The service for recording metrics * @throws NullPointerException if any parameter is null * @throws IllegalArgumentException if an unknown simulator mode is configured */ @@ -71,27 +66,18 @@ public BlockStreamSimulatorApp( @NonNull Configuration configuration, @NonNull BlockStreamManager blockStreamManager, @NonNull PublishStreamGrpcClient publishStreamGrpcClient, + @NonNull PublishStreamGrpcServer publishStreamGrpcServer, @NonNull ConsumerStreamGrpcClient consumerStreamGrpcClient, - @NonNull MetricsService metricsService) { + @NonNull SimulatorModeHandler simulatorModeHandler) { requireNonNull(configuration); requireNonNull(blockStreamManager); loadLoggingProperties(); this.publishStreamGrpcClient = requireNonNull(publishStreamGrpcClient); + this.publishStreamGrpcServer = requireNonNull(publishStreamGrpcServer); this.consumerStreamGrpcClient = requireNonNull(consumerStreamGrpcClient); - - // Initialize the appropriate mode handler based on configuration - final BlockStreamConfig blockStreamConfig = - requireNonNull(configuration.getConfigData(BlockStreamConfig.class)); - // @todo(386) Load simulator mode using dagger - final SimulatorMode simulatorMode = blockStreamConfig.simulatorMode(); - this.simulatorModeHandler = switch (simulatorMode) { - case PUBLISHER -> new PublisherModeHandler( - blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); - case CONSUMER -> new ConsumerModeHandler(consumerStreamGrpcClient); - case BOTH -> new CombinedModeHandler(); - }; + this.simulatorModeHandler = requireNonNull(simulatorModeHandler); } /** @@ -148,8 +134,10 @@ public void stop() throws InterruptedException { public StreamStatus getStreamStatus() { return StreamStatus.builder() .publishedBlocks(publishStreamGrpcClient.getPublishedBlocks()) + .processedBlocks(publishStreamGrpcServer.getProcessedBlocks()) .consumedBlocks(consumerStreamGrpcClient.getConsumedBlocks()) - .lastKnownPublisherStatuses(publishStreamGrpcClient.getLastKnownStatuses()) + .lastKnownPublisherClientStatuses(publishStreamGrpcClient.getLastKnownStatuses()) + .lastKnownPublisherServerStatuses(publishStreamGrpcServer.getLastKnownStatuses()) .lastKnownConsumersStatuses(consumerStreamGrpcClient.getLastKnownStatuses()) .build(); } diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionComponent.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionComponent.java index c2a24ba04..9ce01413a 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionComponent.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionComponent.java @@ -5,6 +5,7 @@ import com.hedera.block.simulator.generator.GeneratorInjectionModule; import com.hedera.block.simulator.grpc.GrpcInjectionModule; import com.hedera.block.simulator.metrics.MetricsInjectionModule; +import com.hedera.block.simulator.mode.SimulatorModeInjectionModule; import com.swirlds.config.api.Configuration; import dagger.BindsInstance; import dagger.Component; @@ -18,6 +19,7 @@ ConfigInjectionModule.class, GeneratorInjectionModule.class, GrpcInjectionModule.class, + SimulatorModeInjectionModule.class }) public interface BlockStreamSimulatorInjectionComponent { diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java index 5a29e0b6a..3f1fc105e 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java @@ -19,7 +19,7 @@ */ @ConfigData("blockStream") public record BlockStreamConfig( - @ConfigProperty(defaultValue = "PUBLISHER") SimulatorMode simulatorMode, + @ConfigProperty(defaultValue = "PUBLISHER_SERVER") SimulatorMode simulatorMode, @ConfigProperty(defaultValue = "10") int lastKnownStatusesCapacity, @ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems, @ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream, @@ -40,7 +40,7 @@ public static Builder builder() { * A builder for creating instances of {@link BlockStreamConfig}. */ public static class Builder { - private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER; + private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER_CLIENT; private int lastKnownStatusesCapacity = 10; private int delayBetweenBlockItems = 1_500_000; private int maxBlockItemsToStream = 10_000; diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java index 6416dfbd1..63c4c98b1 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java @@ -11,15 +11,19 @@ /** * Represents the status of the stream. * - * @param publishedBlocks the number of published blocks + * @param publishedBlocks the number of published blocks from publish client + * @param processedBlocks the number of processed blocks from publish server * @param consumedBlocks the number of consumed blocks - * @param lastKnownPublisherStatuses the last known publisher statuses + * @param lastKnownPublisherClientStatuses the last known statuses from publish client + * @param lastKnownPublisherServerStatuses the last known statuses from publish server * @param lastKnownConsumersStatuses the last known consumers statuses */ public record StreamStatus( long publishedBlocks, + long processedBlocks, long consumedBlocks, - Deque lastKnownPublisherStatuses, + Deque lastKnownPublisherClientStatuses, + Deque lastKnownPublisherServerStatuses, Deque lastKnownConsumersStatuses) { /** @@ -36,8 +40,10 @@ public static Builder builder() { */ public static class Builder { private long publishedBlocks = 0; + private long processedBlocks = 0; private long consumedBlocks = 0; - private Deque lastKnownPublisherStatuses = new ArrayDeque<>(); + private Deque lastKnownPublisherClientStatuses = new ArrayDeque<>(); + private Deque lastKnownPublisherServerStatuses = new ArrayDeque<>(); private Deque lastKnownConsumersStatuses = new ArrayDeque<>(); /** @@ -48,7 +54,7 @@ public Builder() { } /** - * Sets the number of published blocks. + * Sets the number of published blocks by publish client. * * @param publishedBlocks the number of published blocks * @return the builder instance @@ -59,6 +65,18 @@ public Builder publishedBlocks(long publishedBlocks) { return this; } + /** + * Sets the number of processed blocks by publish server. + * + * @param processedBlocks the number of processed blocks by publish server. + * @return the builder instance + */ + public Builder processedBlocks(long processedBlocks) { + requireWhole(processedBlocks); + this.processedBlocks = processedBlocks; + return this; + } + /** * Sets the number of consumed blocks. * @@ -72,14 +90,26 @@ public Builder consumedBlocks(long consumedBlocks) { } /** - * Sets the last known publisher statuses. + * Sets the last known publisher client statuses. + * + * @param lastKnownPublisherClientStatuses the last known publisher statuses from publish client + * @return the builder instance + */ + public Builder lastKnownPublisherClientStatuses(List lastKnownPublisherClientStatuses) { + requireNonNull(lastKnownPublisherClientStatuses); + this.lastKnownPublisherClientStatuses = new ArrayDeque<>(lastKnownPublisherClientStatuses); + return this; + } + + /** + * Sets the last known publisher server statuses. * - * @param lastKnownPublisherStatuses the last known publisher statuses + * @param lastKnownPublisherServerStatuses the last known publisher statuses from publish server * @return the builder instance */ - public Builder lastKnownPublisherStatuses(List lastKnownPublisherStatuses) { - requireNonNull(lastKnownPublisherStatuses); - this.lastKnownPublisherStatuses = new ArrayDeque<>(lastKnownPublisherStatuses); + public Builder lastKnownPublisherServerStatuses(List lastKnownPublisherServerStatuses) { + requireNonNull(lastKnownPublisherServerStatuses); + this.lastKnownPublisherServerStatuses = new ArrayDeque<>(lastKnownPublisherServerStatuses); return this; } @@ -102,7 +132,12 @@ public Builder lastKnownConsumersStatuses(List lastKnownConsumersStatuse */ public StreamStatus build() { return new StreamStatus( - publishedBlocks, consumedBlocks, lastKnownPublisherStatuses, lastKnownConsumersStatuses); + publishedBlocks, + processedBlocks, + consumedBlocks, + lastKnownPublisherClientStatuses, + lastKnownPublisherServerStatuses, + lastKnownConsumersStatuses); } } } diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/types/SimulatorMode.java b/simulator/src/main/java/com/hedera/block/simulator/config/types/SimulatorMode.java index 454ea44d0..33c405319 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/types/SimulatorMode.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/types/SimulatorMode.java @@ -3,16 +3,16 @@ /** The SimulatorMode enum defines the work modes of the block stream simulator. */ public enum SimulatorMode { - /** - * Indicates a work mode in which the simulator is working as both consumer and publisher. - */ - BOTH, /** * Indicates a work mode in which the simulator is working in consumer mode. */ CONSUMER, + /** + * Indicates a work mode in which the simulator is working as both consumer and publisher. + */ + PUBLISHER_SERVER, /** * Indicates a work mode in which the simulator is working in publisher mode. */ - PUBLISHER + PUBLISHER_CLIENT } diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java index 9057bd6aa..40ad58ac2 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java @@ -3,6 +3,7 @@ import com.hedera.block.simulator.grpc.impl.ConsumerStreamGrpcClientImpl; import com.hedera.block.simulator.grpc.impl.PublishStreamGrpcClientImpl; +import com.hedera.block.simulator.grpc.impl.PublishStreamGrpcServerImpl; import dagger.Binds; import dagger.Module; import dagger.Provides; @@ -33,6 +34,16 @@ public interface GrpcInjectionModule { @Binds ConsumerStreamGrpcClient bindConsumerStreamGrpcClient(ConsumerStreamGrpcClientImpl consumerStreamGrpcClient); + /** + * Binds the PublishStreamGrpcServer to the PublishStreamGrpcServerImpl. + * + * @param PublishStreamGrpcServer the PublishStreamGrpcServerImpl + * @return the ConsumerStreamGrpcClient + */ + @Singleton + @Binds + PublishStreamGrpcServer bindPublishStreamGrpcServer(PublishStreamGrpcServerImpl PublishStreamGrpcServer); + /** * Provides the stream enabled flag * diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcServer.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcServer.java new file mode 100644 index 000000000..8458ce99b --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/PublishStreamGrpcServer.java @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: Apache-2.0 +package com.hedera.block.simulator.grpc; + +import java.util.List; + +public interface PublishStreamGrpcServer { + /** + * Initialize, opens a gRPC channel and creates the needed services with the passed configuration. + */ + void init(); + + /** + * Starts the gRPC server. + */ + void start(); + + /** + * Gets the number of processed blocks. + * + * @return the number of published blocks + */ + long getProcessedBlocks(); + + /** + * Gets the last known statuses. + * + * @return the last known statuses + */ + List getLastKnownStatuses(); + + /** + * Shutdowns the server. + * + * @throws InterruptedException if the thread is interrupted + */ + void shutdown() throws InterruptedException; +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcServerImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcServerImpl.java new file mode 100644 index 000000000..af835b5db --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcServerImpl.java @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: Apache-2.0 +package com.hedera.block.simulator.grpc.impl; + +import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlocksProcessed; +import static java.lang.System.Logger.Level.ERROR; +import static java.util.Objects.requireNonNull; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.data.GrpcConfig; +import com.hedera.block.simulator.grpc.PublishStreamGrpcServer; +import com.hedera.block.simulator.metrics.MetricsService; +import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc; +import com.hedera.hapi.block.protoc.PublishStreamRequest; +import com.hedera.hapi.block.protoc.PublishStreamResponse; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; +import javax.inject.Inject; + +/** + * Implementation of {@link PublishStreamGrpcServer} that handles incoming block stream publications + * via gRPC streaming. This implementation manages the server setup, handles incoming block streams, + * tracks processed blocks, and maintains a history of stream statuses. It provides functionality + * to start, monitor, and shutdown the gRPC server. + */ +public class PublishStreamGrpcServerImpl implements PublishStreamGrpcServer { + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + // gRPC Components + private Server server; + + // Configuration + private final GrpcConfig grpcConfig; + + // Service dependencies + private final MetricsService metricsService; + + // State + private final int lastKnownStatusesCapacity; + private final Deque lastKnownStatuses; + + /** + * Constructs a new PublishStreamGrpcServerImpl. + * + * @param grpcConfig Configuration for the gRPC server settings + * @param blockStreamConfig Configuration for the block stream settings + * @param metricsService Service for tracking metrics + * @throws NullPointerException if any of the parameters are null + */ + @Inject + public PublishStreamGrpcServerImpl( + @NonNull final GrpcConfig grpcConfig, + @NonNull final BlockStreamConfig blockStreamConfig, + @NonNull final MetricsService metricsService) { + this.grpcConfig = requireNonNull(grpcConfig); + this.metricsService = requireNonNull(metricsService); + + this.lastKnownStatusesCapacity = blockStreamConfig.lastKnownStatusesCapacity(); + lastKnownStatuses = new ArrayDeque<>(this.lastKnownStatusesCapacity); + } + + /** + * Initialize, opens a gRPC channel and creates the needed services with the passed configuration. + */ + @Override + public void init() { + server = ServerBuilder.forPort(grpcConfig.port()) + .addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() { + @Override + public StreamObserver publishBlockStream( + StreamObserver responseObserver) { + return new PublishStreamServerObserver( + responseObserver, metricsService, lastKnownStatuses, lastKnownStatusesCapacity); + } + }) + .build(); + } + + /** + * Starts the gRPC server. + */ + @Override + public void start() { + try { + server.start(); + } catch (IOException e) { + LOGGER.log(ERROR, "Something went wrong, while trying to start the gRPC server. Error: %s".formatted(e)); + } + } + + /** + * Gets the number of processed blocks. + * + * @return the number of processed blocks + */ + @Override + public long getProcessedBlocks() { + return metricsService.get(LiveBlocksProcessed).get(); + } + + /** + * Gets the last known statuses. + * + * @return the last known statuses + */ + @Override + public List getLastKnownStatuses() { + return List.copyOf(lastKnownStatuses); + } + + /** + * Shutdowns the server. + * + * @throws InterruptedException if the thread is interrupted + */ + @Override + public void shutdown() throws InterruptedException { + server.shutdown(); + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java index 09eb4b444..5af583560 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java @@ -26,7 +26,6 @@ public class PublishStreamObserver implements StreamObserver { + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + // Service dependencies + private final MetricsService metricsService; + + // gRPC Components + private final StreamObserver responseObserver; + + // State + private final int lastKnownStatusesCapacity; + private final Deque lastKnownStatuses; + + /** + * Constructs a new PublishStreamServerObserver that handles stream requests and maintains a history of statuses. + * + * @param responseObserver The observer that handles responses back to the client + * @param metricsService The service for recording consumption metrics + * @param lastKnownStatuses A deque to store the history of request statuses + * @param lastKnownStatusesCapacity The maximum number of statuses to maintain in the history + * @throws NullPointerException if responseObserver or lastKnownStatuses is null + */ + public PublishStreamServerObserver( + @NonNull final StreamObserver responseObserver, + @NonNull final MetricsService metricsService, + @NonNull final Deque lastKnownStatuses, + final int lastKnownStatusesCapacity) { + this.responseObserver = requireNonNull(responseObserver); + this.metricsService = requireNonNull(metricsService); + this.lastKnownStatuses = requireNonNull(lastKnownStatuses); + this.lastKnownStatusesCapacity = lastKnownStatusesCapacity; + } + + /** + * Processes incoming publish stream requests, maintaining a history of requests and handling block acknowledgements. + * If the request contains block items with a block proof, generates and sends an acknowledgement response. + * + * @param publishStreamRequest The incoming stream request to process + */ + @Override + public void onNext(PublishStreamRequest publishStreamRequest) { + if (lastKnownStatuses.size() >= lastKnownStatusesCapacity) { + lastKnownStatuses.pollFirst(); + } + lastKnownStatuses.add(publishStreamRequest.toString()); + + if (publishStreamRequest.hasBlockItems()) { + final List blockItemList = + publishStreamRequest.getBlockItems().getBlockItemsList(); + if (blockItemList.getLast().hasBlockProof()) { + final BlockProof blockProof = publishStreamRequest + .getBlockItems() + .getBlockItemsList() + .getLast() + .getBlockProof(); + final PublishStreamResponse publishStreamResponse = handleBlockAckResponse(blockProof); + + responseObserver.onNext(publishStreamResponse); + metricsService.get(LiveBlocksProcessed).increment(); + } + } + } + + /** + * Handles errors that occur during stream processing. + * + * @param streamError The error that occurred during stream processing + */ + @Override + public void onError(@NonNull final Throwable streamError) { + Status status = Status.fromThrowable(streamError); + LOGGER.log(ERROR, "Error %s with status %s.".formatted(streamError, status), streamError); + } + + /** + * Handles the completion of the stream by completing the response observer and logging the event. + */ + @Override + public void onCompleted() { + responseObserver.onCompleted(); + LOGGER.log(INFO, "Completed"); + } + + private PublishStreamResponse handleBlockAckResponse(BlockProof blockProof) { + final long blockNumber = blockProof.getBlock(); + final BlockAcknowledgement blockAcknowledgement = + BlockAcknowledgement.newBuilder().setBlockNumber(blockNumber).build(); + final Acknowledgement ack = + Acknowledgement.newBuilder().setBlockAck(blockAcknowledgement).build(); + LOGGER.log(INFO, "Returning block acknowledgement for block number: %s".formatted(blockNumber)); + + return PublishStreamResponse.newBuilder().setAcknowledgement(ack).build(); + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java b/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java index 37f6091ce..f73e01319 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java +++ b/simulator/src/main/java/com/hedera/block/simulator/metrics/SimulatorMetricTypes.java @@ -24,6 +24,8 @@ public enum Counter implements SimulatorMetricMetadata { LiveBlockItemsSent("live_block_items_sent", "Live Block Items Sent"), /** The number of live blocks sent by the simulator */ LiveBlocksSent("live_blocks_sent", "Live Blocks Sent"), + /** The number of live blocks processed by the simulator publish server */ + LiveBlocksProcessed("live_blocks_processed_by_server", "Live Blocks Processed by Publish Server"), /** The number of live blocks consumed by the simulator */ LiveBlocksConsumed("live_blocks_consumed", "Live Blocks Consumed"); diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java deleted file mode 100644 index 37beb2e70..000000000 --- a/simulator/src/main/java/com/hedera/block/simulator/mode/CombinedModeHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -package com.hedera.block.simulator.mode; - -import com.hedera.block.simulator.config.data.BlockStreamConfig; - -/** - * The {@code CombinedModeHandler} class implements the {@link SimulatorModeHandler} interface - * and provides the behavior for a mode where both consuming and publishing of block data - * occur simultaneously. - * - *

This mode handles dual operations in the block streaming process, utilizing the - * {@link BlockStreamConfig} for configuration settings. It is designed for scenarios where - * the simulator needs to handle both the consumption and publication of blocks in parallel. - * - *

For now, the actual start behavior is not implemented, as indicated by the - * {@link UnsupportedOperationException}. - */ -public class CombinedModeHandler implements SimulatorModeHandler { - - /** - * Constructs a new {@code CombinedModeHandler} with the specified configuration. - */ - public CombinedModeHandler() {} - - /** - * Initializes resources for both consuming and publishing blocks. - * - * @throws UnsupportedOperationException as this functionality is not yet implemented - */ - @Override - public void init() { - throw new UnsupportedOperationException(); - } - - /** - * Starts both consuming and publishing blocks simultaneously. - * - * @throws UnsupportedOperationException as this functionality is not yet implemented - */ - @Override - public void start() { - throw new UnsupportedOperationException(); - } - - /** - * Gracefully stops both consumption and publishing of blocks. - * - * @throws UnsupportedOperationException as this functionality is not yet implemented - */ - @Override - public void stop() { - throw new UnsupportedOperationException(); - } -} diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java index 1f42078a6..45c347542 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeHandler.java @@ -13,8 +13,8 @@ *

Examples of working modes include: *

    *
  • Consumer mode: The simulator consumes data from the block stream.
  • - *
  • Publisher mode: The simulator publishes data to the block stream.
  • - *
  • Combined mode: The simulator handles both consuming and publishing.
  • + *
  • Publisher Client mode: The simulator publishes data to the block stream.
  • + *
  • Publisher Server mode: The simulator receives blocks from client and sends back acknowledgments or errors.
  • *
*/ public interface SimulatorModeHandler { @@ -27,7 +27,7 @@ public interface SimulatorModeHandler { /** * Starts the simulator and initiates the streaming process according to the configured mode. - * The behavior of this method depends on the specific working mode (consumer, publisher, or combined). + * The behavior of this method depends on the specific working mode (consumer, publisher in client mode, or publisher in server mode). * * @throws BlockSimulatorParsingException if an error occurs while parsing blocks * @throws IOException if an I/O error occurs during block streaming diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeInjectionModule.java b/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeInjectionModule.java new file mode 100644 index 000000000..6bd0aed0b --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/SimulatorModeInjectionModule.java @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: Apache-2.0 +package com.hedera.block.simulator.mode; + +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.types.SimulatorMode; +import com.hedera.block.simulator.generator.BlockStreamManager; +import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient; +import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.block.simulator.grpc.PublishStreamGrpcServer; +import com.hedera.block.simulator.metrics.MetricsService; +import com.hedera.block.simulator.mode.impl.ConsumerModeHandler; +import com.hedera.block.simulator.mode.impl.PublisherClientModeHandler; +import com.hedera.block.simulator.mode.impl.PublisherServerModeHandler; +import com.swirlds.config.api.Configuration; +import dagger.Module; +import dagger.Provides; +import edu.umd.cs.findbugs.annotations.NonNull; +import javax.inject.Singleton; + +@Module +public interface SimulatorModeInjectionModule { + + @Singleton + @Provides + static SimulatorModeHandler providesSimulatorModeHandler( + @NonNull Configuration configuration, + @NonNull BlockStreamManager blockStreamManager, + @NonNull PublishStreamGrpcClient publishStreamGrpcClient, + @NonNull PublishStreamGrpcServer publishStreamGrpcServer, + @NonNull ConsumerStreamGrpcClient consumerStreamGrpcClient, + @NonNull MetricsService metricsService) { + + final BlockStreamConfig blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class); + final SimulatorMode simulatorMode = blockStreamConfig.simulatorMode(); + return switch (simulatorMode) { + case PUBLISHER_CLIENT -> new PublisherClientModeHandler( + blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); + case PUBLISHER_SERVER -> new PublisherServerModeHandler(publishStreamGrpcServer); + case CONSUMER -> new ConsumerModeHandler(consumerStreamGrpcClient); + }; + } +} diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/impl/ConsumerModeHandler.java similarity index 94% rename from simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java rename to simulator/src/main/java/com/hedera/block/simulator/mode/impl/ConsumerModeHandler.java index efca1a851..900f1aaaa 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/mode/ConsumerModeHandler.java +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/impl/ConsumerModeHandler.java @@ -1,12 +1,14 @@ // SPDX-License-Identifier: Apache-2.0 -package com.hedera.block.simulator.mode; +package com.hedera.block.simulator.mode.impl; import static java.lang.System.Logger.Level.INFO; import static java.util.Objects.requireNonNull; import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient; +import com.hedera.block.simulator.mode.SimulatorModeHandler; import edu.umd.cs.findbugs.annotations.NonNull; +import javax.inject.Inject; /** * The {@code ConsumerModeHandler} class implements the {@link SimulatorModeHandler} interface @@ -32,6 +34,7 @@ public class ConsumerModeHandler implements SimulatorModeHandler { * @param consumerStreamGrpcClient The client for consuming blocks via gRPC * @throws NullPointerException if any parameter is null */ + @Inject public ConsumerModeHandler(@NonNull final ConsumerStreamGrpcClient consumerStreamGrpcClient) { this.consumerStreamGrpcClient = requireNonNull(consumerStreamGrpcClient); } diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/impl/PublisherClientModeHandler.java similarity index 95% rename from simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java rename to simulator/src/main/java/com/hedera/block/simulator/mode/impl/PublisherClientModeHandler.java index dd1c94a50..0a4dd94c2 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/mode/PublisherModeHandler.java +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/impl/PublisherClientModeHandler.java @@ -1,5 +1,5 @@ // SPDX-License-Identifier: Apache-2.0 -package com.hedera.block.simulator.mode; +package com.hedera.block.simulator.mode.impl; import static com.hedera.block.simulator.Constants.NANOS_PER_MILLI; import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlockItemsSent; @@ -12,16 +12,17 @@ import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; import com.hedera.block.simulator.metrics.MetricsService; +import com.hedera.block.simulator.mode.SimulatorModeHandler; import com.hedera.hapi.block.stream.protoc.Block; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import javax.inject.Inject; /** * The {@code PublisherModeHandler} class implements the * {@link SimulatorModeHandler} interface - * and provides the behavior for a mode where only publishing of block data - * occurs. + * and provides the behavior for a mode where simulator is working using PublishBlockStream and acts as a client. * *

* This mode handles single operation in the block streaming process, utilizing @@ -30,7 +31,7 @@ * scenarios where * the simulator needs to handle publication of blocks. */ -public class PublisherModeHandler implements SimulatorModeHandler { +public class PublisherClientModeHandler implements SimulatorModeHandler { private final System.Logger LOGGER = System.getLogger(getClass().getName()); // Configuration fields @@ -56,7 +57,8 @@ public class PublisherModeHandler implements SimulatorModeHandler { * @param metricsService The service for recording metrics * @throws NullPointerException if any parameter is null */ - public PublisherModeHandler( + @Inject + public PublisherClientModeHandler( @NonNull final BlockStreamConfig blockStreamConfig, @NonNull final PublishStreamGrpcClient publishStreamGrpcClient, @NonNull final BlockStreamManager blockStreamManager, diff --git a/simulator/src/main/java/com/hedera/block/simulator/mode/impl/PublisherServerModeHandler.java b/simulator/src/main/java/com/hedera/block/simulator/mode/impl/PublisherServerModeHandler.java new file mode 100644 index 000000000..206909fa2 --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/mode/impl/PublisherServerModeHandler.java @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: Apache-2.0 +package com.hedera.block.simulator.mode.impl; + +import static java.lang.System.Logger.Level.INFO; +import static java.util.Objects.requireNonNull; + +import com.hedera.block.simulator.grpc.PublishStreamGrpcServer; +import com.hedera.block.simulator.mode.SimulatorModeHandler; +import edu.umd.cs.findbugs.annotations.NonNull; +import javax.inject.Inject; + +/** + * The {@code PublisherServerModeHandler} class implements the {@link SimulatorModeHandler} interface + * and provides the behavior for a mode where the simulator acts as a server accepting block stream data + * via the publish protocol. + * + *

This mode manages a gRPC server that listens for incoming block stream connections. It handles + * the initialization, startup, and shutdown of the server components through the {@link PublishStreamGrpcServer}. + */ +public class PublisherServerModeHandler implements SimulatorModeHandler { + private final System.Logger LOGGER = System.getLogger(getClass().getName()); + + // Service dependencies + private final PublishStreamGrpcServer publishStreamGrpcServer; + /** + * Constructs a new {@code PublisherServerModeHandler} with the specified configuration. + */ + @Inject + public PublisherServerModeHandler(@NonNull final PublishStreamGrpcServer publishStreamGrpcServer) { + this.publishStreamGrpcServer = requireNonNull(publishStreamGrpcServer); + } + + /** + * Initializes the publisher server mode by setting up the gRPC server. + * This method must be called before {@link #start()}. + */ + @Override + public void init() { + publishStreamGrpcServer.init(); + LOGGER.log(INFO, "gRPC Server initialized for receiving blocks using publish protocol."); + } + + /** + * Starts the publisher server mode by activating the gRPC server to begin accepting connections + * and receiving block stream data. This method should only be called after {@link #init()}. + */ + @Override + public void start() { + publishStreamGrpcServer.start(); + LOGGER.log(INFO, "gRPC Server started successfully."); + } + + /** + * Stops the publisher server mode by gracefully shutting down the gRPC server. + * This method ensures all resources are properly released. + * + * @throws InterruptedException if the shutdown process is interrupted + */ + @Override + public void stop() throws InterruptedException { + publishStreamGrpcServer.shutdown(); + } +} diff --git a/simulator/src/main/java/module-info.java b/simulator/src/main/java/module-info.java index db16c6729..cbc44dbce 100644 --- a/simulator/src/main/java/module-info.java +++ b/simulator/src/main/java/module-info.java @@ -11,6 +11,8 @@ exports com.hedera.block.simulator.generator; exports com.hedera.block.simulator.metrics; exports com.hedera.block.simulator.grpc.impl; + exports com.hedera.block.simulator.mode; + exports com.hedera.block.simulator.mode.impl; requires com.hedera.block.common; requires com.hedera.block.stream; diff --git a/simulator/src/main/resources/app.properties b/simulator/src/main/resources/app.properties index 03d1649bc..ad350fb31 100644 --- a/simulator/src/main/resources/app.properties +++ b/simulator/src/main/resources/app.properties @@ -8,7 +8,7 @@ grpc.port=8080 # ---------------------------------------------- # BlockStreamConfig # ---------------------------------------------- -blockStream.simulatorMode=PUBLISHER +blockStream.simulatorMode=PUBLISHER_CLIENT # ---------------------------------------------- # Prometheus Config # ---------------------------------------------- diff --git a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java index 336c373cd..d5f41bce6 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java @@ -7,7 +7,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; @@ -21,9 +20,12 @@ import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.block.simulator.grpc.PublishStreamGrpcServer; import com.hedera.block.simulator.metrics.MetricsService; import com.hedera.block.simulator.metrics.MetricsServiceImpl; -import com.hedera.block.simulator.mode.PublisherModeHandler; +import com.hedera.block.simulator.mode.SimulatorModeHandler; +import com.hedera.block.simulator.mode.impl.ConsumerModeHandler; +import com.hedera.block.simulator.mode.impl.PublisherClientModeHandler; import com.hedera.hapi.block.stream.output.protoc.BlockHeader; import com.hedera.hapi.block.stream.protoc.Block; import com.hedera.hapi.block.stream.protoc.BlockItem; @@ -52,9 +54,15 @@ class BlockStreamSimulatorTest { @Mock private PublishStreamGrpcClient publishStreamGrpcClient; + @Mock + private PublishStreamGrpcServer publishStreamGrpcServer; + @Mock private ConsumerStreamGrpcClient consumerStreamGrpcClient; + @Mock + private SimulatorModeHandler simulatorModeHandler; + private BlockStreamSimulatorApp blockStreamSimulator; private MetricsService metricsService; @@ -66,7 +74,12 @@ void setUp() throws IOException { metricsService = new MetricsServiceImpl(getTestMetrics(configuration)); blockStreamSimulator = new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService); + configuration, + blockStreamManager, + publishStreamGrpcClient, + publishStreamGrpcServer, + consumerStreamGrpcClient, + simulatorModeHandler); } @AfterEach @@ -92,11 +105,17 @@ void startPublishing_logsStartedMessage() throws InterruptedException, BlockSimu @Test void startConsuming() throws IOException, BlockSimulatorParsingException, InterruptedException { + SimulatorModeHandler consumerModeHandler = new ConsumerModeHandler(consumerStreamGrpcClient); Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "CONSUMER")); metricsService = new MetricsServiceImpl(getTestMetrics(configuration)); blockStreamSimulator = new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService); + configuration, + blockStreamManager, + publishStreamGrpcClient, + publishStreamGrpcServer, + consumerStreamGrpcClient, + consumerModeHandler); blockStreamSimulator.start(); verify(consumerStreamGrpcClient).init(); @@ -106,7 +125,6 @@ void startConsuming() throws IOException, BlockSimulatorParsingException, Interr @Test void start_constantRateStreaming() throws InterruptedException, BlockSimulatorParsingException, IOException { - BlockItem blockItem = BlockItem.newBuilder() .setBlockHeader(BlockHeader.newBuilder().setNumber(1L).build()) .build(); @@ -119,10 +137,14 @@ void start_constantRateStreaming() throws InterruptedException, BlockSimulatorPa .build(); BlockStreamManager blockStreamManager = mock(BlockStreamManager.class); + BlockStreamConfig blockStreamConfig = mock(BlockStreamConfig.class); + SimulatorModeHandler publisherClientModeHandler = new PublisherClientModeHandler( + blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); + when(blockStreamManager.getNextBlock()).thenReturn(block1, block2, null); Configuration configuration = TestUtils.getTestConfiguration(Map.of( "blockStream.simulatorMode", - "PUBLISHER", + "PUBLISHER_CLIENT", "blockStream.maxBlockItemsToStream", "2", "generator.managerImplementation", @@ -135,7 +157,12 @@ void start_constantRateStreaming() throws InterruptedException, BlockSimulatorPa "2")); BlockStreamSimulatorApp blockStreamSimulator = new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService); + configuration, + blockStreamManager, + publishStreamGrpcClient, + publishStreamGrpcServer, + consumerStreamGrpcClient, + publisherClientModeHandler); blockStreamSimulator.start(); assertTrue(blockStreamSimulator.isRunning()); @@ -147,11 +174,20 @@ private String getAbsoluteFolder(String relativePath) { @Test void stopPublishing_doesNotThrowException() throws InterruptedException, IOException { - Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "PUBLISHER")); + BlockStreamConfig blockStreamConfig = mock(BlockStreamConfig.class); + SimulatorModeHandler publisherClientModeHandler = new PublisherClientModeHandler( + blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); + Configuration configuration = + TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "PUBLISHER_CLIENT")); metricsService = new MetricsServiceImpl(getTestMetrics(configuration)); blockStreamSimulator = new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService); + configuration, + blockStreamManager, + publishStreamGrpcClient, + publishStreamGrpcServer, + consumerStreamGrpcClient, + publisherClientModeHandler); assertDoesNotThrow(() -> blockStreamSimulator.stop()); assertFalse(blockStreamSimulator.isRunning()); @@ -160,11 +196,17 @@ void stopPublishing_doesNotThrowException() throws InterruptedException, IOExcep @Test void stopConsuming_doesNotThrowException() throws InterruptedException, IOException { + SimulatorModeHandler consumerModeHandler = new ConsumerModeHandler(consumerStreamGrpcClient); Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "CONSUMER")); metricsService = new MetricsServiceImpl(getTestMetrics(configuration)); blockStreamSimulator = new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService); + configuration, + blockStreamManager, + publishStreamGrpcClient, + publishStreamGrpcServer, + consumerStreamGrpcClient, + consumerModeHandler); assertDoesNotThrow(() -> blockStreamSimulator.stop()); assertFalse(blockStreamSimulator.isRunning()); verify(consumerStreamGrpcClient, atLeast(1)).completeStreaming(); @@ -173,6 +215,10 @@ void stopConsuming_doesNotThrowException() throws InterruptedException, IOExcept @Test void start_millisPerBlockStreaming() throws InterruptedException, IOException, BlockSimulatorParsingException { BlockStreamManager blockStreamManager = mock(BlockStreamManager.class); + BlockStreamConfig blockStreamConfig = mock(BlockStreamConfig.class); + SimulatorModeHandler publisherClientModeHandler = new PublisherClientModeHandler( + blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); + BlockItem blockItem = BlockItem.newBuilder() .setBlockHeader(BlockHeader.newBuilder().setNumber(1L).build()) .build(); @@ -181,7 +227,7 @@ void start_millisPerBlockStreaming() throws InterruptedException, IOException, B Configuration configuration = TestUtils.getTestConfiguration(Map.of( "blockStream.simulatorMode", - "PUBLISHER", + "PUBLISHER_CLIENT", "blockStream.maxBlockItemsToStream", "2", "generator.managerImplementation", @@ -192,7 +238,12 @@ void start_millisPerBlockStreaming() throws InterruptedException, IOException, B "MILLIS_PER_BLOCK")); BlockStreamSimulatorApp blockStreamSimulator = new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService); + configuration, + blockStreamManager, + publishStreamGrpcClient, + publishStreamGrpcServer, + consumerStreamGrpcClient, + publisherClientModeHandler); blockStreamSimulator.start(); assertTrue(blockStreamSimulator.isRunning()); @@ -221,7 +272,7 @@ void start_millisPerSecond_streamingLagVerifyWarnLog() Configuration configuration = TestUtils.getTestConfiguration(Map.of( "blockStream.simulatorMode", - "PUBLISHER", + "PUBLISHER_CLIENT", "generator.managerImplementation", "BlockAsFileBlockStreamManager", "generator.rootPath", @@ -234,9 +285,16 @@ void start_millisPerSecond_streamingLagVerifyWarnLog() "10", "blockStream.blockItemsBatchSize", "1")); - + BlockStreamConfig blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class); + SimulatorModeHandler publisherClientModeHandler = new PublisherClientModeHandler( + blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); BlockStreamSimulatorApp blockStreamSimulator = new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService); + configuration, + blockStreamManager, + publishStreamGrpcClient, + publishStreamGrpcServer, + consumerStreamGrpcClient, + publisherClientModeHandler); List logRecords = captureLogs(); blockStreamSimulator.start(); @@ -248,32 +306,6 @@ void start_millisPerSecond_streamingLagVerifyWarnLog() assertTrue(found_log); } - @Test - void start_withBothMode_throwsUnsupportedOperationException() throws Exception { - Configuration configuration = TestUtils.getTestConfiguration(Map.of("blockStream.simulatorMode", "BOTH")); - blockStreamSimulator = new BlockStreamSimulatorApp( - configuration, blockStreamManager, publishStreamGrpcClient, consumerStreamGrpcClient, metricsService); - assertThrows(UnsupportedOperationException.class, () -> blockStreamSimulator.start()); - } - - @Test - void constructor_throwsExceptionForNullSimulatorMode() { - Configuration configuration = mock(Configuration.class); - BlockStreamConfig blockStreamConfig = mock(BlockStreamConfig.class); - - when(configuration.getConfigData(BlockStreamConfig.class)).thenReturn(blockStreamConfig); - when(blockStreamConfig.simulatorMode()).thenReturn(null); - - assertThrows(NullPointerException.class, () -> { - new BlockStreamSimulatorApp( - configuration, - blockStreamManager, - publishStreamGrpcClient, - consumerStreamGrpcClient, - metricsService); - }); - } - @Test void testGetStreamStatus() { long expectedPublishedBlocks = 5; @@ -288,7 +320,7 @@ void testGetStreamStatus() { assertEquals(expectedPublishedBlocks, streamStatus.publishedBlocks(), "Published blocks should match"); assertIterableEquals( expectedLastKnownStatuses, - streamStatus.lastKnownPublisherStatuses(), + streamStatus.lastKnownPublisherClientStatuses(), "Last known statuses should match"); assertEquals(0, streamStatus.consumedBlocks(), "Consumed blocks should be 0 by default"); assertEquals( @@ -299,7 +331,7 @@ void testGetStreamStatus() { private List captureLogs() { // Capture logs - Logger logger = Logger.getLogger(PublisherModeHandler.class.getName()); + Logger logger = Logger.getLogger(PublisherClientModeHandler.class.getName()); final List logRecords = new ArrayList<>(); // Custom handler to capture logs diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java index 93b9097ae..2accce709 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java @@ -55,12 +55,21 @@ void testStreamConfigBuilder() { } @Test - void testSimulatorMode() { + void testSimulatorPublishClientMode() { BlockStreamConfig config = getBlockStreamConfigBuilder() - .simulatorMode(SimulatorMode.PUBLISHER) + .simulatorMode(SimulatorMode.PUBLISHER_CLIENT) .build(); - assertEquals(SimulatorMode.PUBLISHER, config.simulatorMode()); + assertEquals(SimulatorMode.PUBLISHER_CLIENT, config.simulatorMode()); + } + + @Test + void testSimulatorPublishServerMode() { + BlockStreamConfig config = getBlockStreamConfigBuilder() + .simulatorMode(SimulatorMode.PUBLISHER_SERVER) + .build(); + + assertEquals(SimulatorMode.PUBLISHER_SERVER, config.simulatorMode()); } @Test diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java index 35f5b2ffb..12dd60dbd 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java @@ -19,9 +19,18 @@ void testBuilderDefaultValues() { StreamStatus streamStatus = StreamStatus.builder().build(); assertEquals(0, streamStatus.publishedBlocks(), "Default publishedBlocks should be 0"); + assertEquals(0, streamStatus.processedBlocks(), "Default processedBlocks should be 0"); assertEquals(0, streamStatus.consumedBlocks(), "Default consumedBlocks should be 0"); - assertNotNull(streamStatus.lastKnownPublisherStatuses(), "lastKnownPublisherStatuses should not be null"); - assertTrue(streamStatus.lastKnownPublisherStatuses().isEmpty(), "lastKnownPublisherStatuses should be empty"); + assertNotNull( + streamStatus.lastKnownPublisherClientStatuses(), "lastKnownPublisherClientStatuses should not be null"); + assertTrue( + streamStatus.lastKnownPublisherClientStatuses().isEmpty(), + "lastKnownPublisherClientStatuses should be empty"); + assertNotNull( + streamStatus.lastKnownPublisherServerStatuses(), "lastKnownPublisherServerStatuses should not be null"); + assertTrue( + streamStatus.lastKnownPublisherServerStatuses().isEmpty(), + "lastKnownPublisherServerStatuses should be empty"); assertNotNull(streamStatus.lastKnownConsumersStatuses(), "lastKnownConsumersStatuses should not be null"); assertTrue(streamStatus.lastKnownConsumersStatuses().isEmpty(), "lastKnownConsumersStatuses should be empty"); } @@ -33,8 +42,10 @@ void testBuilderWithValues() { StreamStatus streamStatus = StreamStatus.builder() .publishedBlocks(10) + .processedBlocks(5) .consumedBlocks(8) - .lastKnownPublisherStatuses(publisherStatuses) + .lastKnownPublisherClientStatuses(publisherStatuses) + .lastKnownPublisherServerStatuses(publisherStatuses) .lastKnownConsumersStatuses(consumerStatuses) .build(); @@ -42,8 +53,12 @@ void testBuilderWithValues() { assertEquals(8, streamStatus.consumedBlocks(), "consumedBlocks should be 8"); assertIterableEquals( publisherStatuses, - streamStatus.lastKnownPublisherStatuses(), - "lastKnownPublisherStatuses should match"); + streamStatus.lastKnownPublisherClientStatuses(), + "lastKnownPublisherClientStatuses should match"); + assertIterableEquals( + publisherStatuses, + streamStatus.lastKnownPublisherServerStatuses(), + "lastKnownPublisherServerStatuses should match"); assertIterableEquals( consumerStatuses, streamStatus.lastKnownConsumersStatuses(), "lastKnownConsumersStatuses should match"); } @@ -53,8 +68,10 @@ void testBuilderSetters() { StreamStatus.Builder builder = StreamStatus.builder(); builder.publishedBlocks(5); + builder.processedBlocks(5); builder.consumedBlocks(3); - builder.lastKnownPublisherStatuses(List.of("PubStatus")); + builder.lastKnownPublisherClientStatuses(List.of("PubStatus")); + builder.lastKnownPublisherServerStatuses(List.of("PubStatus")); builder.lastKnownConsumersStatuses(List.of("ConStatus")); StreamStatus streamStatus = builder.build(); @@ -63,8 +80,12 @@ void testBuilderSetters() { assertEquals(3, streamStatus.consumedBlocks(), "consumedBlocks should be 3"); assertIterableEquals( List.of("PubStatus"), - streamStatus.lastKnownPublisherStatuses(), - "lastKnownPublisherStatuses should match"); + streamStatus.lastKnownPublisherClientStatuses(), + "lastKnownPublisherClientStatuses should match"); + assertIterableEquals( + List.of("PubStatus"), + streamStatus.lastKnownPublisherServerStatuses(), + "lastKnownPublisherServerStatuses should match"); assertIterableEquals( List.of("ConStatus"), streamStatus.lastKnownConsumersStatuses(), @@ -81,8 +102,16 @@ void testBuilderDefaultConstructor() { assertEquals(0, streamStatus.publishedBlocks(), "Default publishedBlocks should be 0"); assertEquals(0, streamStatus.consumedBlocks(), "Default consumedBlocks should be 0"); - assertNotNull(streamStatus.lastKnownPublisherStatuses(), "lastKnownPublisherStatuses should not be null"); - assertTrue(streamStatus.lastKnownPublisherStatuses().isEmpty(), "lastKnownPublisherStatuses should be empty"); + assertNotNull( + streamStatus.lastKnownPublisherClientStatuses(), "lastKnownPublisherClientStatuses should not be null"); + assertTrue( + streamStatus.lastKnownPublisherClientStatuses().isEmpty(), + "lastKnownPublisherClientStatuses should be empty"); + assertNotNull( + streamStatus.lastKnownPublisherServerStatuses(), "lastKnownPublisherServerStatuses should not be null"); + assertTrue( + streamStatus.lastKnownPublisherServerStatuses().isEmpty(), + "lastKnownPublisherServerStatuses should be empty"); assertNotNull(streamStatus.lastKnownConsumersStatuses(), "lastKnownConsumersStatuses should not be null"); assertTrue(streamStatus.lastKnownConsumersStatuses().isEmpty(), "lastKnownConsumersStatuses should be empty"); } @@ -94,22 +123,30 @@ void testEqualsAndHashCode() { StreamStatus streamStatus1 = StreamStatus.builder() .publishedBlocks(5) + .processedBlocks(5) .consumedBlocks(3) - .lastKnownPublisherStatuses(publisherStatuses) + .lastKnownPublisherClientStatuses(publisherStatuses) + .lastKnownPublisherServerStatuses(publisherStatuses) .lastKnownConsumersStatuses(consumerStatuses) .build(); StreamStatus streamStatus2 = StreamStatus.builder() .publishedBlocks(5) + .processedBlocks(5) .consumedBlocks(3) - .lastKnownPublisherStatuses(publisherStatuses) + .lastKnownPublisherClientStatuses(publisherStatuses) + .lastKnownPublisherServerStatuses(publisherStatuses) .lastKnownConsumersStatuses(consumerStatuses) .build(); assertIterableEquals( - streamStatus1.lastKnownPublisherStatuses(), - streamStatus2.lastKnownPublisherStatuses(), - "lastKnownPublisherStatuses should match"); + streamStatus1.lastKnownPublisherClientStatuses(), + streamStatus2.lastKnownPublisherClientStatuses(), + "lastKnownPublisherClientStatuses should match"); + assertIterableEquals( + streamStatus1.lastKnownPublisherServerStatuses(), + streamStatus2.lastKnownPublisherServerStatuses(), + "lastKnownPublisherServerStatuses should match"); assertIterableEquals( streamStatus1.lastKnownConsumersStatuses(), streamStatus2.lastKnownConsumersStatuses(), @@ -118,11 +155,17 @@ void testEqualsAndHashCode() { @Test void testNotEquals() { - StreamStatus streamStatus1 = - StreamStatus.builder().publishedBlocks(5).consumedBlocks(3).build(); + StreamStatus streamStatus1 = StreamStatus.builder() + .publishedBlocks(5) + .processedBlocks(5) + .consumedBlocks(3) + .build(); - StreamStatus streamStatus2 = - StreamStatus.builder().publishedBlocks(6).consumedBlocks(3).build(); + StreamStatus streamStatus2 = StreamStatus.builder() + .publishedBlocks(6) + .processedBlocks(5) + .consumedBlocks(3) + .build(); assertNotEquals(streamStatus1, streamStatus2, "StreamStatus instances should not be equal"); } @@ -130,12 +173,15 @@ void testNotEquals() { @Test void testToString() { List publisherStatuses = List.of("Pub1"); + List publisherServerStatuses = List.of("PubServer1"); List consumerStatuses = List.of("Con1"); StreamStatus streamStatus = StreamStatus.builder() .publishedBlocks(5) + .processedBlocks(5) .consumedBlocks(3) - .lastKnownPublisherStatuses(publisherStatuses) + .lastKnownPublisherClientStatuses(publisherStatuses) + .lastKnownPublisherServerStatuses(publisherServerStatuses) .lastKnownConsumersStatuses(consumerStatuses) .build(); @@ -143,10 +189,14 @@ void testToString() { assertNotNull(toString, "toString() should not return null"); assertTrue(toString.contains("publishedBlocks=5"), "toString() should contain 'publishedBlocks=5'"); + assertTrue(toString.contains("processedBlocks=5"), "toString() should contain 'processedBlocks=5'"); assertTrue(toString.contains("consumedBlocks=3"), "toString() should contain 'consumedBlocks=3'"); assertTrue( - toString.contains("lastKnownPublisherStatuses=[Pub1]"), - "toString() should contain 'lastKnownPublisherStatuses=[Pub1]'"); + toString.contains("lastKnownPublisherClientStatuses=[Pub1]"), + "toString() should contain 'lastKnownPublisherClientStatuses=[Pub1]'"); + assertTrue( + toString.contains("lastKnownPublisherServerStatuses=[PubServer1]"), + "toString() should contain 'lastKnownPublisherServerStatuses=[PubServer1]'"); assertTrue( toString.contains("lastKnownConsumersStatuses=[Con1]"), "toString() should contain 'lastKnownConsumersStatuses=[Con1]'"); @@ -155,15 +205,18 @@ void testToString() { @Test void testStatusesLists() { List publisherStatuses = new ArrayList<>(); + List publisherServerStatuses = new ArrayList<>(); List consumerStatuses = new ArrayList<>(); publisherStatuses.add("Publisher1"); + publisherServerStatuses.add("PublisherServer1"); consumerStatuses.add("Consumer1"); StreamStatus streamStatus = StreamStatus.builder() .publishedBlocks(1) .consumedBlocks(1) - .lastKnownPublisherStatuses(publisherStatuses) + .lastKnownPublisherClientStatuses(publisherStatuses) + .lastKnownPublisherServerStatuses(publisherServerStatuses) .lastKnownConsumersStatuses(consumerStatuses) .build(); @@ -172,8 +225,12 @@ void testStatusesLists() { assertNotEquals( List.of("Publisher1", "Publisher2"), - streamStatus.lastKnownPublisherStatuses(), - "lastKnownPublisherStatuses should be immutable"); + streamStatus.lastKnownPublisherClientStatuses(), + "lastKnownPublisherClientStatuses should be immutable"); + assertNotEquals( + List.of("PublisherServer1", "PublisherServer2"), + streamStatus.lastKnownPublisherServerStatuses(), + "lastKnownPublisherServerStatuses should be immutable"); assertNotEquals( List.of("Consumer1", "Consumer2"), streamStatus.lastKnownConsumersStatuses(), @@ -184,8 +241,10 @@ void testStatusesLists() { void testNullLists() { assertThrows(NullPointerException.class, () -> StreamStatus.builder() .publishedBlocks(0) + .processedBlocks(0) .consumedBlocks(0) - .lastKnownPublisherStatuses(null) + .lastKnownPublisherClientStatuses(null) + .lastKnownPublisherServerStatuses(null) .lastKnownConsumersStatuses(null) .build()); } @@ -194,17 +253,24 @@ void testNullLists() { void testBuilderChaining() { StreamStatus streamStatus = StreamStatus.builder() .publishedBlocks(2) + .processedBlocks(2) .consumedBlocks(2) - .lastKnownPublisherStatuses(List.of("PubStatus")) + .lastKnownPublisherClientStatuses(List.of("PubStatus")) + .lastKnownPublisherServerStatuses(List.of("PubStatus")) .lastKnownConsumersStatuses(List.of("ConStatus")) .build(); assertEquals(2, streamStatus.publishedBlocks(), "publishedBlocks should be 2"); + assertEquals(2, streamStatus.processedBlocks(), "processedBlocks should be 2"); assertEquals(2, streamStatus.consumedBlocks(), "consumedBlocks should be 2"); assertIterableEquals( List.of("PubStatus"), - streamStatus.lastKnownPublisherStatuses(), - "lastKnownPublisherStatuses should match"); + streamStatus.lastKnownPublisherClientStatuses(), + "lastKnownPublisherClientStatuses should match"); + assertIterableEquals( + List.of("PubStatus"), + streamStatus.lastKnownPublisherServerStatuses(), + "lastKnownPublisherServerStatuses should match"); assertIterableEquals( List.of("ConStatus"), streamStatus.lastKnownConsumersStatuses(), diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcServerImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcServerImplTest.java new file mode 100644 index 000000000..44e15fbdb --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcServerImplTest.java @@ -0,0 +1,154 @@ +// SPDX-License-Identifier: Apache-2.0 +package com.hedera.block.simulator.grpc.impl; + +import static com.hedera.block.simulator.TestUtils.findFreePort; +import static com.hedera.block.simulator.TestUtils.getTestMetrics; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.when; + +import com.hedera.block.simulator.TestUtils; +import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.data.GrpcConfig; +import com.hedera.block.simulator.grpc.PublishStreamGrpcServer; +import com.hedera.block.simulator.metrics.MetricsService; +import com.hedera.block.simulator.metrics.MetricsServiceImpl; +import com.hedera.hapi.block.protoc.BlockItemSet; +import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc; +import com.hedera.hapi.block.protoc.PublishStreamRequest; +import com.hedera.hapi.block.protoc.PublishStreamResponse; +import com.hedera.hapi.block.stream.output.protoc.BlockHeader; +import com.hedera.hapi.block.stream.protoc.BlockItem; +import com.hedera.hapi.block.stream.protoc.BlockProof; +import com.swirlds.config.api.Configuration; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +class PublishStreamGrpcServerImplTest { + + @Mock + private GrpcConfig grpcConfig; + + private BlockStreamConfig blockStreamConfig; + private MetricsService metricsService; + private PublishStreamGrpcServer publishStreamGrpcServer; + private ManagedChannel channel; + private int serverPort; + + @BeforeEach + void setUp() throws IOException { + MockitoAnnotations.openMocks(this); + + serverPort = findFreePort(); + when(grpcConfig.port()).thenReturn(serverPort); + + final Configuration config = TestUtils.getTestConfiguration(); + blockStreamConfig = config.getConfigData(BlockStreamConfig.class); + metricsService = new MetricsServiceImpl(getTestMetrics(config)); + + publishStreamGrpcServer = new PublishStreamGrpcServerImpl(grpcConfig, blockStreamConfig, metricsService); + publishStreamGrpcServer.init(); + publishStreamGrpcServer.start(); + + channel = ManagedChannelBuilder.forAddress("localhost", serverPort) + .usePlaintext() + .build(); + } + + @AfterEach + void tearDown() throws InterruptedException { + if (channel != null) { + channel.shutdown(); + channel.awaitTermination(5, TimeUnit.SECONDS); + } + if (publishStreamGrpcServer != null) { + publishStreamGrpcServer.shutdown(); + } + } + + @Test + void testInit() { + assertTrue(publishStreamGrpcServer.getLastKnownStatuses().isEmpty()); + assertEquals(0, publishStreamGrpcServer.getProcessedBlocks()); + } + + @Test + void testPublishBlockStream() throws InterruptedException { + // Create a latch to wait for the stream to complete + CountDownLatch latch = new CountDownLatch(1); + + // Create the blocking stub + BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel); + + // Create response observer + StreamObserver responseObserver = new StreamObserver<>() { + @Override + public void onNext(PublishStreamResponse response) { + // Verify response contains acknowledgement + assertTrue(response.hasAcknowledgement()); + } + + @Override + public void onError(Throwable t) { + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + }; + + // Start the stream + StreamObserver requestObserver = stub.publishBlockStream(responseObserver); + + // Send some test blocks + for (int i = 0; i < 3; i++) { + BlockItem blockItemHeader = BlockItem.newBuilder() + .setBlockHeader(BlockHeader.newBuilder().setNumber(i).build()) + .build(); + + BlockItem blockItemProof = BlockItem.newBuilder() + .setBlockProof(BlockProof.newBuilder().setBlock(i).build()) + .build(); + + BlockItemSet blockItems = BlockItemSet.newBuilder() + .addBlockItems(blockItemHeader) + .addBlockItems(blockItemProof) + .build(); + + PublishStreamRequest request = + PublishStreamRequest.newBuilder().setBlockItems(blockItems).build(); + + requestObserver.onNext(request); + } + + // Complete the stream + requestObserver.onCompleted(); + + // Wait for the stream to complete + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + // Verify the server state + List statuses = publishStreamGrpcServer.getLastKnownStatuses(); + assertFalse(statuses.isEmpty()); + assertEquals(3, publishStreamGrpcServer.getProcessedBlocks()); + } + + @Test + void testServerShutdown() throws InterruptedException { + publishStreamGrpcServer.shutdown(); + // Verify channel is terminated after shutdown + channel.shutdown(); + assertTrue(channel.awaitTermination(5, TimeUnit.SECONDS)); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamServerObserverTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamServerObserverTest.java new file mode 100644 index 000000000..f50c9bc58 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamServerObserverTest.java @@ -0,0 +1,123 @@ +// SPDX-License-Identifier: Apache-2.0 +package com.hedera.block.simulator.grpc.impl; + +import static com.hedera.block.simulator.TestUtils.getTestMetrics; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import com.hedera.block.simulator.TestUtils; +import com.hedera.block.simulator.metrics.MetricsService; +import com.hedera.block.simulator.metrics.MetricsServiceImpl; +import com.hedera.hapi.block.protoc.BlockItemSet; +import com.hedera.hapi.block.protoc.PublishStreamRequest; +import com.hedera.hapi.block.protoc.PublishStreamResponse; +import com.hedera.hapi.block.stream.protoc.BlockItem; +import com.hedera.hapi.block.stream.protoc.BlockProof; +import com.swirlds.config.api.Configuration; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.ArrayDeque; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +class PublishStreamServerObserverTest { + + private StreamObserver responseObserver; + private MetricsService metricsService; + private ArrayDeque lastKnownStatuses; + private PublishStreamServerObserver observer; + private static final int CAPACITY = 10; + + @BeforeEach + void setUp() throws IOException { + final Configuration config = TestUtils.getTestConfiguration(); + metricsService = new MetricsServiceImpl(getTestMetrics(config)); + + responseObserver = mock(StreamObserver.class); + lastKnownStatuses = new ArrayDeque<>(); + observer = new PublishStreamServerObserver(responseObserver, metricsService, lastKnownStatuses, CAPACITY); + } + + @Test + void testConstructorWithNullArguments() { + assertThrows( + NullPointerException.class, + () -> new PublishStreamServerObserver(null, metricsService, lastKnownStatuses, CAPACITY)); + assertThrows( + NullPointerException.class, + () -> new PublishStreamServerObserver(responseObserver, metricsService, null, CAPACITY)); + } + + @Test + void testOnNextWithBlockProof() { + // Create a BlockProof + BlockProof blockProof = BlockProof.newBuilder().setBlock(123L).build(); + + // Create a BlockItem with the BlockProof + BlockItem blockItem = BlockItem.newBuilder().setBlockProof(blockProof).build(); + + // Create BlockItemSet + BlockItemSet blockItemSet = + BlockItemSet.newBuilder().addBlockItems(blockItem).build(); + + // Create PublishStreamRequest + PublishStreamRequest request = + PublishStreamRequest.newBuilder().setBlockItems(blockItemSet).build(); + + // Call onNext + observer.onNext(request); + + // Verify the response was sent + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(PublishStreamResponse.class); + verify(responseObserver).onNext(responseCaptor.capture()); + + // Verify the response contains correct block number + PublishStreamResponse capturedResponse = responseCaptor.getValue(); + assertEquals(123L, capturedResponse.getAcknowledgement().getBlockAck().getBlockNumber()); + + // Verify status was stored + assertEquals(1, lastKnownStatuses.size()); + assertEquals(request.toString(), lastKnownStatuses.getLast()); + } + + @Test + void testOnNextWithoutBlockItems() { + PublishStreamRequest request = PublishStreamRequest.newBuilder().build(); + observer.onNext(request); + + // Verify no response was sent + verify(responseObserver, never()).onNext(any()); + + // Verify status was stored + assertEquals(1, lastKnownStatuses.size()); + assertEquals(request.toString(), lastKnownStatuses.getLast()); + } + + @Test + void testStatusHistoryCapacity() { + PublishStreamRequest request = PublishStreamRequest.newBuilder().build(); + + // Fill beyond capacity + for (int i = 0; i < CAPACITY + 5; i++) { + observer.onNext(request); + } + + // Verify size is maintained at capacity + assertEquals(CAPACITY, lastKnownStatuses.size()); + } + + @Test + void testOnError() { + Throwable error = new RuntimeException("Test error"); + observer.onError(error); + + verifyNoInteractions(responseObserver); + } + + @Test + void testOnCompleted() { + observer.onCompleted(); + verify(responseObserver).onCompleted(); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java deleted file mode 100644 index a33e779d7..000000000 --- a/simulator/src/test/java/com/hedera/block/simulator/mode/CombinedModeHandlerTest.java +++ /dev/null @@ -1,18 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -package com.hedera.block.simulator.mode; - -import static org.junit.jupiter.api.Assertions.assertThrows; - -import org.junit.jupiter.api.Test; - -public class CombinedModeHandlerTest { - - private CombinedModeHandler combinedModeHandler; - - @Test - void testStartThrowsUnsupportedOperationException() { - combinedModeHandler = new CombinedModeHandler(); - - assertThrows(UnsupportedOperationException.class, () -> combinedModeHandler.start()); - } -} diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java index 423d8d972..108d6f8ae 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/mode/ConsumerModeHandlerTest.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.verify; import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient; +import com.hedera.block.simulator.mode.impl.ConsumerModeHandler; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherClientModeHandlerTest.java similarity index 89% rename from simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java rename to simulator/src/test/java/com/hedera/block/simulator/mode/PublisherClientModeHandlerTest.java index df1d2057f..8fc961165 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherModeHandlerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherClientModeHandlerTest.java @@ -17,6 +17,7 @@ import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; import com.hedera.block.simulator.metrics.MetricsService; import com.hedera.block.simulator.metrics.MetricsServiceImpl; +import com.hedera.block.simulator.mode.impl.PublisherClientModeHandler; import com.hedera.hapi.block.stream.protoc.Block; import com.hedera.hapi.block.stream.protoc.BlockItem; import com.swirlds.config.api.Configuration; @@ -29,7 +30,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -public class PublisherModeHandlerTest { +public class PublisherClientModeHandlerTest { @Mock private BlockStreamConfig blockStreamConfig; @@ -43,7 +44,7 @@ public class PublisherModeHandlerTest { @Mock private MetricsService metricsService; - private PublisherModeHandler publisherModeHandler; + private PublisherClientModeHandler publisherClientModeHandler; @BeforeEach void setUp() throws IOException { @@ -61,7 +62,7 @@ void testStartWithMillisPerBlockStreaming_WithBlocks() throws Exception { when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); when(blockStreamConfig.millisecondsPerBlock()).thenReturn(0); // No delay for testing - publisherModeHandler = new PublisherModeHandler( + publisherClientModeHandler = new PublisherClientModeHandler( blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); Block block1 = mock(Block.class); @@ -76,7 +77,7 @@ void testStartWithMillisPerBlockStreaming_WithBlocks() throws Exception { when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(true); - publisherModeHandler.start(); + publisherClientModeHandler.start(); verify(publishStreamGrpcClient).streamBlock(block1); verify(publishStreamGrpcClient).streamBlock(block2); @@ -88,12 +89,12 @@ void testStartWithMillisPerBlockStreaming_WithBlocks() throws Exception { void testStartWithMillisPerBlockStreaming_NoBlocks() throws Exception { when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); - publisherModeHandler = new PublisherModeHandler( + publisherClientModeHandler = new PublisherClientModeHandler( blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); when(blockStreamManager.getNextBlock()).thenReturn(null); - publisherModeHandler.start(); + publisherClientModeHandler.start(); verify(publishStreamGrpcClient, never()).streamBlock(any(Block.class)); verify(blockStreamManager).getNextBlock(); @@ -103,7 +104,7 @@ void testStartWithMillisPerBlockStreaming_NoBlocks() throws Exception { void testStartWithMillisPerBlockStreaming_ShouldPublishFalse() throws Exception { when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); - publisherModeHandler = new PublisherModeHandler( + publisherClientModeHandler = new PublisherClientModeHandler( blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); Block block1 = mock(Block.class); @@ -118,8 +119,8 @@ void testStartWithMillisPerBlockStreaming_ShouldPublishFalse() throws Exception when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(true); - publisherModeHandler.stop(); - publisherModeHandler.start(); + publisherClientModeHandler.stop(); + publisherClientModeHandler.start(); verify(publishStreamGrpcClient, never()).streamBlock(any(Block.class)); verify(blockStreamManager).getNextBlock(); @@ -129,13 +130,13 @@ void testStartWithMillisPerBlockStreaming_ShouldPublishFalse() throws Exception void testStartWithMillisPerBlockStreaming_NoBlocksAndShouldPublishFalse() throws Exception { when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); - publisherModeHandler = new PublisherModeHandler( + publisherClientModeHandler = new PublisherClientModeHandler( blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); when(blockStreamManager.getNextBlock()).thenReturn(null); - publisherModeHandler.stop(); - publisherModeHandler.start(); + publisherClientModeHandler.stop(); + publisherClientModeHandler.start(); verify(publishStreamGrpcClient, never()).streamBlock(any(Block.class)); verify(blockStreamManager).getNextBlock(); @@ -147,7 +148,7 @@ void testStartWithConstantRateStreaming_WithinMaxItems() throws Exception { when(blockStreamConfig.delayBetweenBlockItems()).thenReturn(0); when(blockStreamConfig.maxBlockItemsToStream()).thenReturn(5); - publisherModeHandler = new PublisherModeHandler( + publisherClientModeHandler = new PublisherClientModeHandler( blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); when(publishStreamGrpcClient.streamBlock(any(Block.class))).thenReturn(true); @@ -170,7 +171,7 @@ void testStartWithConstantRateStreaming_WithinMaxItems() throws Exception { when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(true); - publisherModeHandler.start(); + publisherClientModeHandler.start(); verify(publishStreamGrpcClient).streamBlock(block1); verify(publishStreamGrpcClient).streamBlock(block2); @@ -184,7 +185,7 @@ void testStartWithConstantRateStreaming_ExceedingMaxItems() throws Exception { when(blockStreamConfig.delayBetweenBlockItems()).thenReturn(0); when(blockStreamConfig.maxBlockItemsToStream()).thenReturn(5); - publisherModeHandler = new PublisherModeHandler( + publisherClientModeHandler = new PublisherClientModeHandler( blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); when(publishStreamGrpcClient.streamBlock(any(Block.class))).thenReturn(true); @@ -214,7 +215,7 @@ void testStartWithConstantRateStreaming_ExceedingMaxItems() throws Exception { when(publishStreamGrpcClient.streamBlock(block3)).thenReturn(true); when(publishStreamGrpcClient.streamBlock(block4)).thenReturn(true); - publisherModeHandler.start(); + publisherClientModeHandler.start(); verify(publishStreamGrpcClient).streamBlock(block1); verify(publishStreamGrpcClient).streamBlock(block2); @@ -226,12 +227,12 @@ void testStartWithConstantRateStreaming_ExceedingMaxItems() throws Exception { @Test void testStartWithConstantRateStreaming_NoBlocks() throws Exception { when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.CONSTANT_RATE); - publisherModeHandler = new PublisherModeHandler( + publisherClientModeHandler = new PublisherClientModeHandler( blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); when(blockStreamManager.getNextBlock()).thenReturn(null); - publisherModeHandler.start(); + publisherClientModeHandler.start(); verify(publishStreamGrpcClient, never()).streamBlock(any(Block.class)); verify(blockStreamManager).getNextBlock(); @@ -241,12 +242,12 @@ void testStartWithConstantRateStreaming_NoBlocks() throws Exception { void testStartWithExceptionDuringStreaming() throws Exception { when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); - publisherModeHandler = new PublisherModeHandler( + publisherClientModeHandler = new PublisherClientModeHandler( blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); when(blockStreamManager.getNextBlock()).thenThrow(new IOException("Test exception")); - assertThrows(IOException.class, () -> publisherModeHandler.start()); + assertThrows(IOException.class, () -> publisherClientModeHandler.start()); verify(publishStreamGrpcClient, never()).streamBlock(any(Block.class)); verify(blockStreamManager).getNextBlock(); @@ -259,7 +260,7 @@ void testMillisPerBlockStreaming_streamSuccessBecomesFalse() throws Exception { when(blockStreamConfig.streamingMode()).thenReturn(StreamingMode.MILLIS_PER_BLOCK); when(blockStreamConfig.millisecondsPerBlock()).thenReturn(1000); - publisherModeHandler = new PublisherModeHandler( + publisherClientModeHandler = new PublisherClientModeHandler( blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); Block block1 = mock(Block.class); @@ -273,7 +274,7 @@ void testMillisPerBlockStreaming_streamSuccessBecomesFalse() throws Exception { when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(false); - publisherModeHandler.start(); + publisherClientModeHandler.start(); verify(publishStreamGrpcClient).streamBlock(block1); verify(publishStreamGrpcClient).streamBlock(block2); @@ -287,7 +288,7 @@ void testConstantRateStreaming_streamSuccessBecomesFalse() throws Exception { when(blockStreamConfig.delayBetweenBlockItems()).thenReturn(0); when(blockStreamConfig.maxBlockItemsToStream()).thenReturn(100); - publisherModeHandler = new PublisherModeHandler( + publisherClientModeHandler = new PublisherClientModeHandler( blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService); Block block1 = mock(Block.class); @@ -307,7 +308,7 @@ void testConstantRateStreaming_streamSuccessBecomesFalse() throws Exception { when(publishStreamGrpcClient.streamBlock(block1)).thenReturn(true); when(publishStreamGrpcClient.streamBlock(block2)).thenReturn(false); - publisherModeHandler.start(); + publisherClientModeHandler.start(); verify(publishStreamGrpcClient).streamBlock(block1); verify(publishStreamGrpcClient).streamBlock(block2); diff --git a/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherServerModeHandlerTest.java b/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherServerModeHandlerTest.java new file mode 100644 index 000000000..0fbd0b447 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/mode/PublisherServerModeHandlerTest.java @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: Apache-2.0 +package com.hedera.block.simulator.mode; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.*; + +import com.hedera.block.simulator.grpc.PublishStreamGrpcServer; +import com.hedera.block.simulator.mode.impl.PublisherServerModeHandler; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class PublisherServerModeHandlerTest { + + @Mock + private PublishStreamGrpcServer publishStreamGrpcServer; + + private PublisherServerModeHandler publisherServerModeHandler; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + publisherServerModeHandler = new PublisherServerModeHandler(publishStreamGrpcServer); + } + + @Test + void testConstructorWithNullArguments() { + assertThrows(NullPointerException.class, () -> new PublisherServerModeHandler(null)); + } + + @Test + void testInit() { + publisherServerModeHandler.init(); + verify(publishStreamGrpcServer).init(); + } + + @Test + void testStop() throws InterruptedException { + publisherServerModeHandler.stop(); + verify(publishStreamGrpcServer).shutdown(); + } + + @Test + void testStop_throwsException() throws InterruptedException { + doThrow(new InterruptedException("Test exception")) + .when(publishStreamGrpcServer) + .shutdown(); + + assertThrows(InterruptedException.class, () -> publisherServerModeHandler.stop()); + } +} diff --git a/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java b/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java index 12a4e5d4f..d82a25d83 100644 --- a/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java +++ b/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java @@ -61,11 +61,11 @@ void verifyPublishBlockStreamEndpoint() throws IOException, InterruptedException // We just need to make sure that number of published blocks is equal or greater than the statuses. Statuses are // tracked in a queue to avoid unnecessary memory usage, therefore will always be less or equal to published. assertTrue(streamStatus.publishedBlocks() - >= streamStatus.lastKnownPublisherStatuses().size()); + >= streamStatus.lastKnownPublisherClientStatuses().size()); // Verify each status contains the word "acknowledgement" streamStatus - .lastKnownPublisherStatuses() + .lastKnownPublisherClientStatuses() .forEach(status -> assertTrue(status.toLowerCase().contains("acknowledgement"))); } }