Skip to content

Commit

Permalink
[#3497] Increase performance of Pub/Sub topic and subscription creation
Browse files Browse the repository at this point in the history
Increased performance by not listing topics & subscriptions, rather try to create them and catch the exception.

- remove listing of topics & subscriptions
- try to create topics & subscriptions and catch the exception if they already exist

Fixes #3497

Signed-off-by: michelle <[email protected]>
Co-authored-by: mattkaem <[email protected]>
  • Loading branch information
michelleFranke and mattkaem committed Jun 22, 2023
1 parent b006d69 commit 7ba7540
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum
private final PubSubSubscriberFactory subscriberFactory;
private final LifecycleStatus lifecycleStatus = new LifecycleStatus();
private final PubSubBasedAdminClientManager adminClientManager;
private final Vertx vertx;
private MessageReceiver receiver;

/**
Expand All @@ -89,19 +90,22 @@ public PubSubBasedInternalCommandConsumer(
final PubSubSubscriberFactory subscriberFactory,
final String projectId,
final CredentialsProvider credentialsProvider) {
Objects.requireNonNull(vertx);
Objects.requireNonNull(projectId);
Objects.requireNonNull(credentialsProvider);
this.vertx = Objects.requireNonNull(vertx);
this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
this.commandHandlers = Objects.requireNonNull(commandHandlers);
this.tenantClient = Objects.requireNonNull(tenantClient);
this.tracer = Objects.requireNonNull(tracer);
this.subscriberFactory = Objects.requireNonNull(subscriberFactory);
this.adminClientManager = new PubSubBasedAdminClientManager(projectId, credentialsProvider, vertx);
this.adminClientManager = new PubSubBasedAdminClientManager(projectId, credentialsProvider);
createReceiver();
adminClientManager
.getOrCreateTopicAndSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
.getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
.onFailure(thr -> log.error("Could not create topic for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.onSuccess(s -> subscriberFactory.getOrCreateSubscriber(s, receiver));
Expand All @@ -111,6 +115,7 @@ public PubSubBasedInternalCommandConsumer(
* Creates a Pub/Sub based internal command consumer. To be used for Unittests.
*
* @param commandResponseSender The sender used to send command responses.
* @param vertx The Vert.x instance to use.
* @param adapterInstanceId The adapter instance id.
* @param commandHandlers The command handlers to choose from for handling a received command.
* @param tenantClient The client to use for retrieving tenant configuration data.
Expand All @@ -122,14 +127,15 @@ public PubSubBasedInternalCommandConsumer(
*/
public PubSubBasedInternalCommandConsumer(
final CommandResponseSender commandResponseSender,
final Vertx vertx,
final String adapterInstanceId,
final CommandHandlers commandHandlers,
final TenantClient tenantClient,
final Tracer tracer,
final PubSubSubscriberFactory subscriberFactory,
final PubSubBasedAdminClientManager adminClientManager,
final MessageReceiver receiver) {

this.vertx = Objects.requireNonNull(vertx);
this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
this.commandHandlers = Objects.requireNonNull(commandHandlers);
Expand All @@ -139,7 +145,10 @@ public PubSubBasedInternalCommandConsumer(
this.adminClientManager = Objects.requireNonNull(adminClientManager);
this.receiver = Objects.requireNonNull(receiver);
adminClientManager
.getOrCreateTopicAndSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
.getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
.onFailure(thr -> log.error("Could not create topic for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.onSuccess(s -> subscriberFactory.getOrCreateSubscriber(s, receiver));
Expand Down Expand Up @@ -195,9 +204,7 @@ public Future<Void> start() {
}

private void createReceiver() {
receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
handleCommandMessage(message);
};
receiver = (PubsubMessage message, AckReplyConsumer consumer) -> handleCommandMessage(message);
}

Future<Void> handleCommandMessage(final PubsubMessage message) {
Expand Down Expand Up @@ -255,7 +262,11 @@ Future<Void> handleCommandMessage(final PubsubMessage message) {
public Future<Void> stop() {
return lifecycleStatus.runStopAttempt(() -> CompositeFuture.all(
subscriberFactory.closeSubscriber(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId),
adminClientManager.closeAdminClients()).mapEmpty());
vertx.executeBlocking(promise -> {
adminClientManager.closeAdminClients();
promise.complete();
})
).mapEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class PubSubBasedInternalCommandConsumerTest {

private final String adapterInstanceId = "test-adapter";

private final String subscription = String.format("%s.%s", adapterInstanceId,
private final String topicAndSubscription = String.format("%s.%s", adapterInstanceId,
CommandConstants.INTERNAL_COMMAND_ENDPOINT);

private PubSubSubscriberClient subscriber;
Expand All @@ -85,6 +85,7 @@ public class PubSubBasedInternalCommandConsumerTest {

@BeforeEach
void setUp() {
final Vertx vertxMock = mock(Vertx.class);
commandResponseSender = mock(CommandResponseSender.class);

final Tracer tracer = TracingMockSupport.mockTracer(TracingMockSupport.mockSpan());
Expand All @@ -104,19 +105,23 @@ void setUp() {

final PubSubBasedAdminClientManager adminClientManager = mock(PubSubBasedAdminClientManager.class);
when(adminClientManager
.getOrCreateTopicAndSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.thenReturn(Future.succeededFuture(subscription));
.getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.thenReturn(Future.succeededFuture(topicAndSubscription));
when(adminClientManager
.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.thenReturn(Future.succeededFuture(topicAndSubscription));

subscriber = mock(PubSubSubscriberClient.class);
when(subscriber.subscribe(true)).thenReturn(Future.succeededFuture());

final MessageReceiver receiver = mock(MessageReceiver.class);
final PubSubSubscriberFactory subscriberFactory = mock(PubSubSubscriberFactory.class);
when(subscriberFactory.getOrCreateSubscriber(subscription, receiver))
when(subscriberFactory.getOrCreateSubscriber(topicAndSubscription, receiver))
.thenReturn(subscriber);

internalCommandConsumer = new PubSubBasedInternalCommandConsumer(
commandResponseSender,
vertxMock,
adapterInstanceId,
commandHandlers,
tenantClient,
Expand Down
Loading

0 comments on commit 7ba7540

Please sign in to comment.