Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for PublishBlockStream to simulator #548

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions simulator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,32 @@ testModuleInfo {
}

// Task to run simulator in Publisher mode
tasks.register<JavaExec>("runPublisher") {
description = "Run the simulator in Publisher mode"
tasks.register<JavaExec>("runPublisherClient") {
description = "Run the simulator in Publisher Client mode"
group = "application"

mainClass = application.mainClass
mainModule = application.mainModule
classpath = sourceSets["main"].runtimeClasspath

environment("BLOCK_STREAM_SIMULATOR_MODE", "PUBLISHER")
environment("BLOCK_STREAM_SIMULATOR_MODE", "PUBLISHER_CLIENT")
environment("PROMETHEUS_ENDPOINT_ENABLED", "true")
environment("PROMETHEUS_ENDPOINT_PORT_NUMBER", "9998")
}

tasks.register<JavaExec>("runPublisherServer") {
description = "Run the simulator in Publisher Server mode"
group = "application"

mainClass = application.mainClass
mainModule = application.mainModule
classpath = sourceSets["main"].runtimeClasspath

environment("BLOCK_STREAM_SIMULATOR_MODE", "PUBLISHER_SERVER")
environment("PROMETHEUS_ENDPOINT_ENABLED", "true")
environment("PROMETHEUS_ENDPOINT_PORT_NUMBER", "9996")
}

// Task to run simulator in Consumer mode
tasks.register<JavaExec>("runConsumer") {
description = "Run the simulator in Consumer mode"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@
import static java.lang.System.Logger.Level.INFO;
import static java.util.Objects.requireNonNull;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.data.StreamStatus;
import com.hedera.block.simulator.config.types.SimulatorMode;
import com.hedera.block.simulator.exception.BlockSimulatorParsingException;
import com.hedera.block.simulator.generator.BlockStreamManager;
import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.block.simulator.mode.CombinedModeHandler;
import com.hedera.block.simulator.mode.ConsumerModeHandler;
import com.hedera.block.simulator.mode.PublisherModeHandler;
import com.hedera.block.simulator.grpc.PublishStreamGrpcServer;
import com.hedera.block.simulator.mode.SimulatorModeHandler;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Down Expand Up @@ -46,6 +41,7 @@ public class BlockStreamSimulatorApp {

// Service dependencies
private final PublishStreamGrpcClient publishStreamGrpcClient;
private final PublishStreamGrpcServer publishStreamGrpcServer;
private final ConsumerStreamGrpcClient consumerStreamGrpcClient;
private final SimulatorModeHandler simulatorModeHandler;

Expand All @@ -62,7 +58,6 @@ public class BlockStreamSimulatorApp {
* generation
* @param publishStreamGrpcClient The gRPC client for publishing blocks
* @param consumerStreamGrpcClient The gRPC client for consuming blocks
* @param metricsService The service for recording metrics
* @throws NullPointerException if any parameter is null
* @throws IllegalArgumentException if an unknown simulator mode is configured
*/
Expand All @@ -71,27 +66,18 @@ public BlockStreamSimulatorApp(
@NonNull Configuration configuration,
@NonNull BlockStreamManager blockStreamManager,
@NonNull PublishStreamGrpcClient publishStreamGrpcClient,
@NonNull PublishStreamGrpcServer publishStreamGrpcServer,
@NonNull ConsumerStreamGrpcClient consumerStreamGrpcClient,
@NonNull MetricsService metricsService) {
@NonNull SimulatorModeHandler simulatorModeHandler) {

requireNonNull(configuration);
requireNonNull(blockStreamManager);
loadLoggingProperties();

this.publishStreamGrpcClient = requireNonNull(publishStreamGrpcClient);
this.publishStreamGrpcServer = requireNonNull(publishStreamGrpcServer);
this.consumerStreamGrpcClient = requireNonNull(consumerStreamGrpcClient);

// Initialize the appropriate mode handler based on configuration
final BlockStreamConfig blockStreamConfig =
requireNonNull(configuration.getConfigData(BlockStreamConfig.class));
// @todo(386) Load simulator mode using dagger
final SimulatorMode simulatorMode = blockStreamConfig.simulatorMode();
this.simulatorModeHandler = switch (simulatorMode) {
case PUBLISHER -> new PublisherModeHandler(
blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService);
case CONSUMER -> new ConsumerModeHandler(consumerStreamGrpcClient);
case BOTH -> new CombinedModeHandler();
};
this.simulatorModeHandler = requireNonNull(simulatorModeHandler);
}

/**
Expand Down Expand Up @@ -148,8 +134,10 @@ public void stop() throws InterruptedException {
public StreamStatus getStreamStatus() {
return StreamStatus.builder()
.publishedBlocks(publishStreamGrpcClient.getPublishedBlocks())
.processedBlocks(publishStreamGrpcServer.getProcessedBlocks())
georgi-l95 marked this conversation as resolved.
Show resolved Hide resolved
.consumedBlocks(consumerStreamGrpcClient.getConsumedBlocks())
.lastKnownPublisherStatuses(publishStreamGrpcClient.getLastKnownStatuses())
.lastKnownPublisherClientStatuses(publishStreamGrpcClient.getLastKnownStatuses())
.lastKnownPublisherServerStatuses(publishStreamGrpcServer.getLastKnownStatuses())
.lastKnownConsumersStatuses(consumerStreamGrpcClient.getLastKnownStatuses())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.hedera.block.simulator.generator.GeneratorInjectionModule;
import com.hedera.block.simulator.grpc.GrpcInjectionModule;
import com.hedera.block.simulator.metrics.MetricsInjectionModule;
import com.hedera.block.simulator.mode.SimulatorModeInjectionModule;
import com.swirlds.config.api.Configuration;
import dagger.BindsInstance;
import dagger.Component;
Expand All @@ -18,6 +19,7 @@
ConfigInjectionModule.class,
GeneratorInjectionModule.class,
GrpcInjectionModule.class,
SimulatorModeInjectionModule.class
})
public interface BlockStreamSimulatorInjectionComponent {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
@ConfigData("blockStream")
public record BlockStreamConfig(
@ConfigProperty(defaultValue = "PUBLISHER") SimulatorMode simulatorMode,
@ConfigProperty(defaultValue = "PUBLISHER_SERVER") SimulatorMode simulatorMode,
@ConfigProperty(defaultValue = "10") int lastKnownStatusesCapacity,
@ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems,
@ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream,
Expand All @@ -40,7 +40,7 @@ public static Builder builder() {
* A builder for creating instances of {@link BlockStreamConfig}.
*/
public static class Builder {
private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER;
private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER_CLIENT;
private int lastKnownStatusesCapacity = 10;
private int delayBetweenBlockItems = 1_500_000;
private int maxBlockItemsToStream = 10_000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@
/**
* Represents the status of the stream.
*
* @param publishedBlocks the number of published blocks
* @param publishedBlocks the number of published blocks from publish client
* @param processedBlocks the number of processed blocks from publish server
* @param consumedBlocks the number of consumed blocks
* @param lastKnownPublisherStatuses the last known publisher statuses
* @param lastKnownPublisherClientStatuses the last known statuses from publish client
* @param lastKnownPublisherServerStatuses the last known statuses from publish server
* @param lastKnownConsumersStatuses the last known consumers statuses
*/
public record StreamStatus(
long publishedBlocks,
long processedBlocks,
long consumedBlocks,
Deque<String> lastKnownPublisherStatuses,
Deque<String> lastKnownPublisherClientStatuses,
Deque<String> lastKnownPublisherServerStatuses,
Deque<String> lastKnownConsumersStatuses) {

/**
Expand All @@ -36,8 +40,10 @@ public static Builder builder() {
*/
public static class Builder {
private long publishedBlocks = 0;
private long processedBlocks = 0;
private long consumedBlocks = 0;
private Deque<String> lastKnownPublisherStatuses = new ArrayDeque<>();
private Deque<String> lastKnownPublisherClientStatuses = new ArrayDeque<>();
private Deque<String> lastKnownPublisherServerStatuses = new ArrayDeque<>();
private Deque<String> lastKnownConsumersStatuses = new ArrayDeque<>();

/**
Expand All @@ -48,7 +54,7 @@ public Builder() {
}

/**
* Sets the number of published blocks.
* Sets the number of published blocks by publish client.
*
* @param publishedBlocks the number of published blocks
* @return the builder instance
Expand All @@ -59,6 +65,18 @@ public Builder publishedBlocks(long publishedBlocks) {
return this;
}

/**
* Sets the number of processed blocks by publish server.
*
* @param processedBlocks the number of processed blocks by publish server.
* @return the builder instance
*/
public Builder processedBlocks(long processedBlocks) {
requireWhole(processedBlocks);
this.processedBlocks = processedBlocks;
return this;
}

/**
* Sets the number of consumed blocks.
*
Expand All @@ -72,14 +90,26 @@ public Builder consumedBlocks(long consumedBlocks) {
}

/**
* Sets the last known publisher statuses.
* Sets the last known publisher client statuses.
*
* @param lastKnownPublisherClientStatuses the last known publisher statuses from publish client
* @return the builder instance
*/
public Builder lastKnownPublisherClientStatuses(List<String> lastKnownPublisherClientStatuses) {
requireNonNull(lastKnownPublisherClientStatuses);
this.lastKnownPublisherClientStatuses = new ArrayDeque<>(lastKnownPublisherClientStatuses);
return this;
}

/**
* Sets the last known publisher server statuses.
*
* @param lastKnownPublisherStatuses the last known publisher statuses
* @param lastKnownPublisherServerStatuses the last known publisher statuses from publish server
* @return the builder instance
*/
public Builder lastKnownPublisherStatuses(List<String> lastKnownPublisherStatuses) {
requireNonNull(lastKnownPublisherStatuses);
this.lastKnownPublisherStatuses = new ArrayDeque<>(lastKnownPublisherStatuses);
public Builder lastKnownPublisherServerStatuses(List<String> lastKnownPublisherServerStatuses) {
requireNonNull(lastKnownPublisherServerStatuses);
this.lastKnownPublisherServerStatuses = new ArrayDeque<>(lastKnownPublisherServerStatuses);
return this;
}

Expand All @@ -102,7 +132,12 @@ public Builder lastKnownConsumersStatuses(List<String> lastKnownConsumersStatuse
*/
public StreamStatus build() {
return new StreamStatus(
publishedBlocks, consumedBlocks, lastKnownPublisherStatuses, lastKnownConsumersStatuses);
publishedBlocks,
processedBlocks,
consumedBlocks,
lastKnownPublisherClientStatuses,
lastKnownPublisherServerStatuses,
lastKnownConsumersStatuses);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@

/** The SimulatorMode enum defines the work modes of the block stream simulator. */
public enum SimulatorMode {
/**
* Indicates a work mode in which the simulator is working as both consumer and publisher.
*/
BOTH,
/**
* Indicates a work mode in which the simulator is working in consumer mode.
*/
CONSUMER,
/**
* Indicates a work mode in which the simulator is working as both consumer and publisher.
*/
PUBLISHER_SERVER,
/**
* Indicates a work mode in which the simulator is working in publisher mode.
*/
PUBLISHER
PUBLISHER_CLIENT
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import com.hedera.block.simulator.grpc.impl.ConsumerStreamGrpcClientImpl;
import com.hedera.block.simulator.grpc.impl.PublishStreamGrpcClientImpl;
import com.hedera.block.simulator.grpc.impl.PublishStreamGrpcServerImpl;
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
Expand Down Expand Up @@ -33,6 +34,16 @@ public interface GrpcInjectionModule {
@Binds
ConsumerStreamGrpcClient bindConsumerStreamGrpcClient(ConsumerStreamGrpcClientImpl consumerStreamGrpcClient);

/**
* Binds the PublishStreamGrpcServer to the PublishStreamGrpcServerImpl.
*
* @param PublishStreamGrpcServer the PublishStreamGrpcServerImpl
* @return the ConsumerStreamGrpcClient
*/
@Singleton
@Binds
PublishStreamGrpcServer bindPublishStreamGrpcServer(PublishStreamGrpcServerImpl PublishStreamGrpcServer);

/**
* Provides the stream enabled flag
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.simulator.grpc;

import java.util.List;

public interface PublishStreamGrpcServer {
/**
* Initialize, opens a gRPC channel and creates the needed services with the passed configuration.
*/
void init();

/**
* Starts the gRPC server.
*/
void start();

/**
* Gets the number of processed blocks.
*
* @return the number of published blocks
*/
long getProcessedBlocks();

/**
* Gets the last known statuses.
*
* @return the last known statuses
*/
List<String> getLastKnownStatuses();

/**
* Shutdowns the server.
*
* @throws InterruptedException if the thread is interrupted
*/
void shutdown() throws InterruptedException;
}
Loading
Loading