Skip to content

Commit

Permalink
QuarkusPubSub to simplify PubSub subscriber and publisher creation
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Feb 9, 2024
1 parent f008ce4 commit b8a54ba
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 104 deletions.
68 changes: 17 additions & 51 deletions docs/modules/ROOT/pages/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Be sure to have read the https://quarkiverse.github.io/quarkiverse-docs/quarkus-

== Bootstrapping the project

First, we need a new project.Create a new project with the following command (replace the version placeholder with the correct one):
First, we need a new project. Create a new project with the following command (replace the version placeholder with the correct one):

[source,shell script]
----
Expand Down Expand Up @@ -49,16 +49,14 @@ gcloud pubsub topics create test-topic

== Authentication

By default, PubSub mandates the usage of the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to define its credentials, so
you need to set this one instead of relying on the `quarkus.google.cloud.service-account-location` property.

[source]
----
export GOOGLE_APPLICATION_CREDENTIALS=<your-service-account-file>
----
This extension provides a `QuarkusPubSub` CDI bean that can help to interact with Google PubSub.
`QuarkusPubSub` is automatically authenticated, so you don't have to do anything else to use it.

If you don't want to use `QuarkusPubSub`, be sure to configure the authentication.
By default, PubSub mandates the usage of the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to define its credentials, so
you need to set it instead of relying on the `quarkus.google.cloud.service-account-location` property.
Another solution, is to inject a `CredentialsProvider` provided by the extension, and to use it inside the various PubSub
builders and settings objects when, instantiating PubSub components. This can be seen on the example that follows.
builders and settings objects when, instantiating PubSub components.

== Some example

Expand All @@ -69,9 +67,7 @@ We also register a consumer to the same topic at `@PostConstruct` time that logs
[source,java]
----
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -80,53 +76,43 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub;
@Path("/pubsub")
public class PubSubResource {
private static final Logger LOG = Logger.getLogger(PubSubResource.class);
@ConfigProperty(name = "quarkus.google.cloud.project-id")
String projectId;// Inject the projectId property from application.properties
@Inject
QuarkusPubSub pubSub;
private TopicName topicName;
private Subscriber subscriber;
@Inject
CredentialsProvider credentialsProvider;
@PostConstruct
void init() throws IOException {
// Init topic and subscription, the topic must have been created before
topicName = TopicName.of(projectId, "test-topic");
ProjectSubscriptionName subscriptionName = initSubscription();
// init topic and subscription
pubSub.createTopic("test-topic");
pubSub.createSubscription("test-topic", "test-subscription");
// Subscribe to PubSub
MessageReceiver receiver = (message, consumer) -> {
LOG.infov("Got message {0}", message.getData().toStringUtf8());
consumer.ack();
};
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber = pubSub.subscriber("test-subscription", receiver);
subscriber.startAsync().awaitRunning();
}
Expand All @@ -142,9 +128,8 @@ public class PubSubResource {
@Produces(MediaType.TEXT_PLAIN)
public void pubsub() throws IOException, InterruptedException {
// Init a publisher to the topic
Publisher publisher = Publisher.newBuilder(topicName)
.setCredentialsProvider(credentialsProvider)
.build();
Publisher publisher = pubSub.publisher("test-topic");
try {
ByteString data = ByteString.copyFromUtf8("my-message");// Create a new message
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
Expand All @@ -163,25 +148,6 @@ public class PubSubResource {
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
private ProjectSubscriptionName initSubscription() throws IOException {
// List all existing subscriptions and create the 'test-subscription' if needed
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, "test-subscription");
SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider)
.build();
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) {
Iterable<Subscription> subscriptions = subscriptionAdminClient.listSubscriptions(ProjectName.of(projectId))
.iterateAll();
Optional<Subscription> existing = StreamSupport.stream(subscriptions.spliterator(), false)
.filter(sub -> sub.getName().equals(subscriptionName.toString()))
.findFirst();
if (existing.isEmpty()) {
subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
}
}
return subscriptionName;
}
}
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,31 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;

import io.quarkiverse.googlecloudservices.it.pubsub.TopicManager;
import io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub;

@Path("/pubsub")
public class PubSubResource {
private static final Logger LOG = Logger.getLogger(PubSubResource.class);

@Inject
TopicManager topicManager;
QuarkusPubSub pubSub;

private Subscriber subscriber;
private String lastMessage;

@PostConstruct
void init() throws IOException {
// init topic and subscription
pubSub.createTopic("test-topic");
pubSub.createSubscription("test-topic", "test-subscription");

// subscribe to PubSub
MessageReceiver receiver = (message, consumer) -> {
this.lastMessage = message.getData().toStringUtf8();
LOG.infov("Got message {0}", this.lastMessage);
consumer.ack();
};
subscriber = topicManager.initSubscriber(receiver);
subscriber = pubSub.subscriber("test-subscription", receiver);
subscriber.startAsync().awaitRunning();
}

Expand All @@ -55,12 +59,12 @@ void destroy() {
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void sendMessage(String message) throws IOException, InterruptedException {
Publisher publisher = topicManager.initPublisher();
Publisher publisher = pubSub.publisher("test-topic");
try {
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<>() {
public void onSuccess(String messageId) {
LOG.infov("published with message id {0}", messageId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
%test.bigtable.authenticated=false

# Use pubsub emulator
%test.pubsub.use-emulator=true
%test.quarkus.google.cloud.pubsub.devservice.enabled=true
%test.quarkus.google.cloud.firestore.devservice.enabled=true
%test.quarkus.google.cloud.bigtable.devservice.enabled=true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkiverse.googlecloudservices.pubsub.deployment;

import io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;

Expand All @@ -10,4 +12,9 @@ public class PubSubBuildSteps {
public FeatureBuildItem feature() {
return new FeatureBuildItem(FEATURE);
}

@BuildStep
public AdditionalBeanBuildItem producer() {
return new AdditionalBeanBuildItem(QuarkusPubSub.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.quarkiverse.googlecloudservices.pubsub;

import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(name = "google.cloud.pubsub", phase = ConfigPhase.RUN_TIME)
public class PubSubConfiguration {
/**
* Enable emulator and set its host.
*/
@ConfigItem
public Optional<String> emulatorHost;

}
Loading

0 comments on commit b8a54ba

Please sign in to comment.