Skip to content

Commit

Permalink
fix: switched to a completion service using a single thread per consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jan 30, 2025
1 parent c9091a6 commit aeda7c0
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import com.hedera.hapi.block.SubscribeStreamResponseUnparsed;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
* The AsyncConsumerStreamResponseObserver class is responsible for decoupling the
Expand All @@ -19,25 +20,24 @@
class AsyncConsumerStreamResponseObserver
implements BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>> {

private final ExecutorService executorService;
private final CompletionService<Void> completionService;
private final SubscriptionHandler<SubscribeStreamResponseUnparsed> subscriptionHandler;
private final BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>> nextBlockNodeEventHandler;

/**
* Constructor for the AsyncConsumerStreamResponseObserver class.
*
* @param executorService the executor service to use for asynchronous processing
* @param subscriptionHandler the handler for managing subscriptions
* @param nextBlockNodeEventHandler the next block node event handler in the chain
*/
public AsyncConsumerStreamResponseObserver(
@NonNull final ExecutorService executorService,
@NonNull final CompletionService<Void> completionService,
@NonNull final SubscriptionHandler<SubscribeStreamResponseUnparsed> subscriptionHandler,
@NonNull
final BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>>
nextBlockNodeEventHandler) {

this.executorService = Objects.requireNonNull(executorService);
this.completionService = Objects.requireNonNull(completionService);
this.subscriptionHandler = Objects.requireNonNull(subscriptionHandler);
this.nextBlockNodeEventHandler = Objects.requireNonNull(nextBlockNodeEventHandler);
}
Expand All @@ -50,9 +50,13 @@ public void onEvent(
@NonNull final ObjectEvent<SubscribeStreamResponseUnparsed> event, final long l, final boolean b) {

try {
executorService
.submit(new Task(event, l, b, subscriptionHandler, this, nextBlockNodeEventHandler))
.get();
completionService.submit(new Task(event, l, b, subscriptionHandler, this, nextBlockNodeEventHandler));

Future<Void> future = completionService.poll(); // Non-blocking check
if (future != null) {
future.get();
}

} catch (ExecutionException | InterruptedException e) {
Throwable cause = e.getCause();
if (cause instanceof IllegalArgumentException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
import com.hedera.pbj.runtime.grpc.Pipeline;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.InstantSource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletionService;

/**
* LiveStreamEventHandlerBuilder is a factory class for building the event handler chain for
* streaming block items.
*/
public class LiveStreamEventHandlerBuilder {
public static BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>> build(
@NonNull final ExecutorService executorService,
@NonNull final CompletionService<Void> completionService,
@NonNull final InstantSource producerLivenessClock,
@NonNull final SubscriptionHandler<SubscribeStreamResponseUnparsed> subscriptionHandler,
@NonNull final Pipeline<? super SubscribeStreamResponseUnparsed> observer,
Expand All @@ -28,7 +28,7 @@ public static BlockNodeEventHandler<ObjectEvent<SubscribeStreamResponseUnparsed>
new ConsumerStreamResponseObserver(producerLivenessClock, observer, blockNodeContext);

final var asyncConsumerStreamResponseObserver = new AsyncConsumerStreamResponseObserver(
executorService, subscriptionHandler, consumerStreamResponseObserver);
completionService, subscriptionHandler, consumerStreamResponseObserver);

// Set the link backward to handle unsubscribe events
consumerStreamResponseObserver.setPrevSubscriptionHandler(asyncConsumerStreamResponseObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.time.Clock;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import javax.inject.Inject;

Expand All @@ -43,7 +43,6 @@ public class PbjBlockStreamServiceProxy implements PbjBlockStreamService {

private final System.Logger LOGGER = System.getLogger(getClass().getName());

private final ExecutorService executorService;
private final LiveStreamMediator streamMediator;
private final ServiceStatus serviceStatus;
private final BlockNodeContext blockNodeContext;
Expand Down Expand Up @@ -74,7 +73,6 @@ public PbjBlockStreamServiceProxy(
streamMediator.subscribe(Objects.requireNonNull(streamPersistenceHandler));
streamMediator.subscribe(Objects.requireNonNull(streamVerificationHandler));
this.streamMediator = Objects.requireNonNull(streamMediator);
this.executorService = Executors.newVirtualThreadPerTaskExecutor();
}

/**
Expand Down Expand Up @@ -168,7 +166,7 @@ void subscribeBlockStream(
// Unsubscribe any expired notifiers
streamMediator.unsubscribeAllExpired();
final var liveStreamEventHandler = LiveStreamEventHandlerBuilder.build(
executorService,
new ExecutorCompletionService<>(Executors.newSingleThreadExecutor()),
Clock.systemDefaultZone(),
streamMediator,
subscribeStreamResponseObserver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import java.io.UncheckedIOException;
import java.time.InstantSource;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -53,6 +56,13 @@ public class ConsumerStreamResponseObserverTest {

final BlockNodeContext testContext;

private CompletionService<Void> completionService;

@BeforeEach
public void setUp() {
completionService = new ExecutorCompletionService<>(Executors.newSingleThreadExecutor());
}

public ConsumerStreamResponseObserverTest() throws IOException {
this.testContext = TestConfigUtil.getTestBlockNodeContext(
Map.of(TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, String.valueOf(TIMEOUT_THRESHOLD_MILLIS)));
Expand All @@ -64,7 +74,7 @@ public void testProducerTimeoutWithinWindow() throws Exception {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
new ForkJoinPool(), testClock, streamMediator, responseStreamObserver, testContext);
completionService, testClock, streamMediator, responseStreamObserver, testContext);

final BlockHeader blockHeader = BlockHeader.newBuilder().number(1).build();
final BlockItemUnparsed blockItem = BlockItemUnparsed.newBuilder()
Expand Down Expand Up @@ -95,7 +105,7 @@ public void testProducerTimeoutOutsideWindow() throws Exception {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS + 1);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
new ForkJoinPool(), testClock, streamMediator, responseStreamObserver, testContext);
completionService, testClock, streamMediator, responseStreamObserver, testContext);

consumerBlockItemObserver.onEvent(objectEvent, 0, true);
verify(streamMediator, timeout(testTimeout)).unsubscribe(consumerBlockItemObserver);
Expand All @@ -109,7 +119,7 @@ public void testConsumerNotToSendBeforeBlockHeader() throws Exception {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
new ForkJoinPool(), testClock, streamMediator, responseStreamObserver, testContext);
completionService, testClock, streamMediator, responseStreamObserver, testContext);

// Send non-header BlockItems to validate that the observer does not send them
for (int i = 1; i <= 10; i++) {
Expand Down Expand Up @@ -172,7 +182,7 @@ public void testSubscriberStreamResponseIsBlockItemWhenBlockItemIsNull() {
when(objectEvent.get()).thenReturn(subscribeStreamResponse);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
new ForkJoinPool(), testClock, streamMediator, responseStreamObserver, testContext);
completionService, testClock, streamMediator, responseStreamObserver, testContext);
assertThrows(IllegalArgumentException.class, () -> consumerBlockItemObserver.onEvent(objectEvent, 0, true));
}

Expand All @@ -184,7 +194,7 @@ public void testSubscribeStreamResponseTypeNotSupported() {
when(objectEvent.get()).thenReturn(subscribeStreamResponse);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
new ForkJoinPool(), testClock, streamMediator, responseStreamObserver, testContext);
completionService, testClock, streamMediator, responseStreamObserver, testContext);

assertThrows(IllegalArgumentException.class, () -> consumerBlockItemObserver.onEvent(objectEvent, 0, true));
}
Expand All @@ -204,7 +214,7 @@ public void testUncheckedIOExceptionException() throws Exception {
doThrow(UncheckedIOException.class).when(responseStreamObserver).onNext(subscribeStreamResponse);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
new ForkJoinPool(), testClock, streamMediator, responseStreamObserver, testContext);
completionService, testClock, streamMediator, responseStreamObserver, testContext);
consumerBlockItemObserver.onEvent(objectEvent, 0, true);

verify(streamMediator, timeout(testTimeout).times(1)).unsubscribe(any());
Expand All @@ -225,7 +235,7 @@ public void testRuntimeException() throws Exception {
doThrow(RuntimeException.class).when(responseStreamObserver).onNext(subscribeStreamResponse);

final var consumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
new ForkJoinPool(), testClock, streamMediator, responseStreamObserver, testContext);
completionService, testClock, streamMediator, responseStreamObserver, testContext);
consumerBlockItemObserver.onEvent(objectEvent, 0, true);

verify(streamMediator, timeout(testTimeout).times(1)).unsubscribe(any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -86,15 +87,15 @@ public class LiveStreamMediatorImplTest {

private final BlockNodeContext testContext;

private final ExecutorService executorService;
private final CompletionService<Void> completionService;

public LiveStreamMediatorImplTest() throws IOException {
Map<String, String> properties = new HashMap<>();
properties.put(TestConfigUtil.CONSUMER_TIMEOUT_THRESHOLD_KEY, String.valueOf(TIMEOUT_THRESHOLD_MILLIS));
properties.put(TestConfigUtil.MEDIATOR_RING_BUFFER_SIZE_KEY, String.valueOf(1024));

this.testContext = TestConfigUtil.getTestBlockNodeContext(properties);
this.executorService = new ForkJoinPool();
this.completionService = new ExecutorCompletionService<>(Executors.newSingleThreadExecutor());
}

@Test
Expand Down Expand Up @@ -167,11 +168,11 @@ public void testMediatorPublishEventToSubscribers() throws IOException, ParseExc
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var concreteObserver1 = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext);
final var concreteObserver2 = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver2, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver2, testContext);
final var concreteObserver3 = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver3, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver3, testContext);

// Set up the subscribers
streamMediator.subscribe(concreteObserver1);
Expand Down Expand Up @@ -225,11 +226,11 @@ public void testSubAndUnsubHandling() throws IOException {
when(testClock.millis()).thenReturn(TEST_TIME, TEST_TIME + TIMEOUT_THRESHOLD_MILLIS);

final var concreteObserver1 = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext);
final var concreteObserver2 = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver2, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver2, testContext);
final var concreteObserver3 = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver3, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver3, testContext);

// Set up the subscribers
streamMediator.subscribe(concreteObserver1);
Expand All @@ -253,7 +254,7 @@ public void testSubscribeWhenHandlerAlreadySubscribed() throws IOException {
.build();

final var concreteObserver1 = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext);

streamMediator.subscribe(concreteObserver1);
assertTrue(streamMediator.isSubscribed(concreteObserver1));
Expand Down Expand Up @@ -383,11 +384,11 @@ public void testMediatorBlocksPublishAfterException() throws IOException, Interr
.build();

final var concreteObserver1 = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext);
final var concreteObserver2 = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver2, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver2, testContext);
final var concreteObserver3 = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver3, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver3, testContext);

// Set up the subscribers
streamMediator.subscribe(concreteObserver1);
Expand Down Expand Up @@ -463,7 +464,7 @@ public void testUnsubscribeWhenNotSubscribed() throws IOException {
streamMediator.subscribe(handler);

final var testConsumerBlockItemObserver = LiveStreamEventHandlerBuilder.build(
executorService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext);
completionService, testClock, streamMediator, helidonSubscribeStreamObserver1, testContext);

// Confirm the observer is not subscribed
assertFalse(streamMediator.isSubscribed(testConsumerBlockItemObserver));
Expand Down

0 comments on commit aeda7c0

Please sign in to comment.