Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for PublishBlockStream to simulator #548

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: add publisher server grpc
Signed-off-by: georgi-l95 <[email protected]>
georgi-l95 committed Jan 15, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 68b1a52813b7419614f5016c61d0c146ba052b96
13 changes: 13 additions & 0 deletions simulator/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -47,6 +47,19 @@ tasks.register<JavaExec>("runPublisherClient") {
environment("PROMETHEUS_ENDPOINT_PORT_NUMBER", "9998")
}

tasks.register<JavaExec>("runPublisherServer") {
description = "Run the simulator in Publisher Server mode"
group = "application"

mainClass = application.mainClass
mainModule = application.mainModule
classpath = sourceSets["main"].runtimeClasspath

environment("BLOCK_STREAM_SIMULATOR_MODE", "PUBLISHER_SERVER")
environment("PROMETHEUS_ENDPOINT_ENABLED", "true")
environment("PROMETHEUS_ENDPOINT_PORT_NUMBER", "9996")
}

// Task to run simulator in Consumer mode
tasks.register<JavaExec>("runConsumer") {
description = "Run the simulator in Consumer mode"
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
*/
@ConfigData("blockStream")
public record BlockStreamConfig(
@ConfigProperty(defaultValue = "PUBLISHER_CLIENT") SimulatorMode simulatorMode,
@ConfigProperty(defaultValue = "PUBLISHER_SERVER") SimulatorMode simulatorMode,
@ConfigProperty(defaultValue = "10") int lastKnownStatusesCapacity,
@ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems,
@ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream,
Original file line number Diff line number Diff line change
@@ -132,7 +132,12 @@ public Builder lastKnownConsumersStatuses(List<String> lastKnownConsumersStatuse
*/
public StreamStatus build() {
return new StreamStatus(
publishedBlocks, processedBlocks, consumedBlocks, lastKnownPublisherClientStatuses, lastKnownPublisherServerStatuses, lastKnownConsumersStatuses);
publishedBlocks,
processedBlocks,
consumedBlocks,
lastKnownPublisherClientStatuses,
lastKnownPublisherServerStatuses,
lastKnownConsumersStatuses);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.simulator.grpc;

import java.util.List;

public interface PublishStreamGrpcServer {
void init();
/**
* Gets the number of processed blocks.
*
@@ -16,4 +18,16 @@ public interface PublishStreamGrpcServer {
* @return the last known statuses
*/
List<String> getLastKnownStatuses();

/**
* Sends a onCompleted message to the client and waits for a short period of time to ensure the message is sent.
*/
void completeStreaming();

/**
* Shutdowns the server.
*
* @throws InterruptedException if the thread is interrupted
*/
void shutdown() throws InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,75 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.simulator.grpc.impl;

import com.hedera.block.simulator.grpc.PublishStreamGrpcServer;
import static com.hedera.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlocksProcessed;
import static java.lang.System.Logger.Level.ERROR;
import static java.util.Objects.requireNonNull;

import javax.inject.Inject;
import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.data.GrpcConfig;
import com.hedera.block.simulator.grpc.PublishStreamGrpcServer;
import com.hedera.block.simulator.metrics.MetricsService;
import com.hedera.hapi.block.protoc.BlockStreamServiceGrpc;
import com.hedera.hapi.block.protoc.PublishStreamRequest;
import com.hedera.hapi.block.protoc.PublishStreamResponse;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import javax.inject.Inject;

public class PublishStreamGrpcServerImpl implements PublishStreamGrpcServer {
private final System.Logger LOGGER = System.getLogger(getClass().getName());

// gRPC Components
private Server server;
private PublishStreamServerObserver publishStreamServerObserver;

// Configuration
private final BlockStreamConfig blockStreamConfig;
private final GrpcConfig grpcConfig;

// Service dependencies
private final MetricsService metricsService;

// State
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;

@Inject
public PublishStreamGrpcServerImpl() {
public PublishStreamGrpcServerImpl(
@NonNull final GrpcConfig grpcConfig,
@NonNull final BlockStreamConfig blockStreamConfig,
@NonNull final MetricsService metricsService) {
this.grpcConfig = requireNonNull(grpcConfig);
this.metricsService = requireNonNull(metricsService);
this.blockStreamConfig = requireNonNull(blockStreamConfig);

this.lastKnownStatusesCapacity = blockStreamConfig.lastKnownStatusesCapacity();
lastKnownStatuses = new ArrayDeque<>(this.lastKnownStatusesCapacity);
}

@Override
public void init() {
server = ServerBuilder.forPort(grpcConfig.port())
.addService(new BlockStreamServiceGrpc.BlockStreamServiceImplBase() {
@Override
public StreamObserver<PublishStreamRequest> publishBlockStream(
StreamObserver<PublishStreamResponse> responseObserver) {
publishStreamServerObserver = new PublishStreamServerObserver(responseObserver);
return publishStreamServerObserver;
}
})
.build();
try {
server.start();
} catch (IOException e) {
LOGGER.log(ERROR, e);
}
}

/**
@@ -19,7 +79,7 @@ public PublishStreamGrpcServerImpl() {
*/
@Override
public long getProcessedBlocks() {
return 0;
return metricsService.get(LiveBlocksProcessed).get();
}

/**
@@ -29,6 +89,26 @@ public long getProcessedBlocks() {
*/
@Override
public List<String> getLastKnownStatuses() {
return null;
return List.copyOf(lastKnownStatuses);
}

/**
* Sends a onCompleted message to the client and waits for a short period of
* time to ensure the message is sent.
*/
@Override
public void completeStreaming() {
publishStreamServerObserver.onCompleted();
}

/**
* Shutdowns the channel.
*
* @throws InterruptedException if the thread is interrupted
*/
@Override
public void shutdown() throws InterruptedException {
completeStreaming();
server.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.simulator.grpc.impl;

import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.INFO;
import static java.util.Objects.requireNonNull;

import com.hedera.hapi.block.protoc.PublishStreamRequest;
import com.hedera.hapi.block.protoc.PublishStreamResponse;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

public class PublishStreamServerObserver implements StreamObserver<PublishStreamRequest> {
private final System.Logger LOGGER = System.getLogger(getClass().getName());

// gRPC Components
private final StreamObserver<PublishStreamResponse> responseObserver;

public PublishStreamServerObserver(StreamObserver<PublishStreamResponse> responseObserver) {
this.responseObserver = requireNonNull(responseObserver);
}

@Override
public void onNext(PublishStreamRequest publishStreamRequest) {
LOGGER.log(INFO, publishStreamRequest.getBlockItems());
// send block ack. if there is a block proof in the set
// send item ack. if there is no block proof
}

@Override
public void onError(@NonNull final Throwable streamError) {
Status status = Status.fromThrowable(streamError);
LOGGER.log(ERROR, "Error %s with status %s.".formatted(streamError, status), streamError);
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
LOGGER.log(INFO, "Completed");
}
}
Original file line number Diff line number Diff line change
@@ -24,6 +24,8 @@ public enum Counter implements SimulatorMetricMetadata {
LiveBlockItemsSent("live_block_items_sent", "Live Block Items Sent"),
/** The number of live blocks sent by the simulator */
LiveBlocksSent("live_blocks_sent", "Live Blocks Sent"),
/** The number of live blocks processed by the simulator publish server */
LiveBlocksProcessed("live_blocks_processed_by_server", "Live Blocks Processed by Publish Server"),
/** The number of live blocks consumed by the simulator */
LiveBlocksConsumed("live_blocks_consumed", "Live Blocks Consumed");

Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.simulator.mode;

import static java.lang.System.Logger.Level.INFO;
import static java.util.Objects.requireNonNull;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.grpc.PublishStreamGrpcServer;
import edu.umd.cs.findbugs.annotations.NonNull;

import javax.inject.Inject;

import static java.util.Objects.requireNonNull;

/**
* The {@code PublisherServerModeHandler} class implements the {@link SimulatorModeHandler} interface
* and provides the behavior for a mode where simulator is working using PublishBlockStream and acts as a server.
@@ -33,33 +33,22 @@ public PublisherServerModeHandler(@NonNull final PublishStreamGrpcServer publish
this.publishStreamGrpcServer = requireNonNull(publishStreamGrpcServer);
}

/**
* Initializes resources for both consuming and publishing blocks.
*
* @throws UnsupportedOperationException as this functionality is not yet implemented
*/
@Override
public void init() {
throw new UnsupportedOperationException();
publishStreamGrpcServer.init();
LOGGER.log(INFO, "gRPC Server initialized for receiving blocks using publish protocol.");
}

/**
* Starts both consuming and publishing blocks simultaneously.
*
* @throws UnsupportedOperationException as this functionality is not yet implemented
*/
@Override
public void start() {
throw new UnsupportedOperationException();
}
public void start() {}

/**
* Gracefully stops both consumption and publishing of blocks.
*
* @throws UnsupportedOperationException as this functionality is not yet implemented
*/
@Override
public void stop() {
throw new UnsupportedOperationException();
public void stop() throws InterruptedException {
publishStreamGrpcServer.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -13,8 +13,8 @@
* <p>Examples of working modes include:
* <ul>
* <li>Consumer mode: The simulator consumes data from the block stream.</li>
* <li>Publisher mode: The simulator publishes data to the block stream.</li>
* <li>Combined mode: The simulator handles both consuming and publishing.</li>
* <li>Publisher Client mode: The simulator publishes data to the block stream.</li>
* <li>Publisher Server mode: The simulator receives blocks from client and sends back acknowledgments or errors.</li>
* </ul>
*/
public interface SimulatorModeHandler {
@@ -27,7 +27,7 @@ public interface SimulatorModeHandler {

/**
* Starts the simulator and initiates the streaming process according to the configured mode.
* The behavior of this method depends on the specific working mode (consumer, publisher, or combined).
* The behavior of this method depends on the specific working mode (consumer, publisher in client mode, or publisher in server mode).
*
* @throws BlockSimulatorParsingException if an error occurs while parsing blocks
* @throws IOException if an I/O error occurs during block streaming
1 change: 1 addition & 0 deletions simulator/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
exports com.hedera.block.simulator.generator;
exports com.hedera.block.simulator.metrics;
exports com.hedera.block.simulator.grpc.impl;
exports com.hedera.block.simulator.mode;

requires com.hedera.block.common;
requires com.hedera.block.stream;
Loading