Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QuarkusPubSub to simplify PubSub subscriber and publisher creation #577

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 17 additions & 51 deletions docs/modules/ROOT/pages/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Be sure to have read the https://quarkiverse.github.io/quarkiverse-docs/quarkus-

== Bootstrapping the project

First, we need a new project.Create a new project with the following command (replace the version placeholder with the correct one):
First, we need a new project. Create a new project with the following command (replace the version placeholder with the correct one):

[source,shell script]
----
Expand Down Expand Up @@ -49,16 +49,14 @@ gcloud pubsub topics create test-topic

== Authentication

By default, PubSub mandates the usage of the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to define its credentials, so
you need to set this one instead of relying on the `quarkus.google.cloud.service-account-location` property.

[source]
----
export GOOGLE_APPLICATION_CREDENTIALS=<your-service-account-file>
----
This extension provides a `QuarkusPubSub` CDI bean that can help to interact with Google PubSub.
`QuarkusPubSub` is automatically authenticated, so you don't have to do anything else to use it.

If you don't want to use `QuarkusPubSub`, be sure to configure the authentication.
By default, PubSub mandates the usage of the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to define its credentials, so
you need to set it instead of relying on the `quarkus.google.cloud.service-account-location` property.
Another solution, is to inject a `CredentialsProvider` provided by the extension, and to use it inside the various PubSub
builders and settings objects when, instantiating PubSub components. This can be seen on the example that follows.
builders and settings objects when, instantiating PubSub components.

== Some example

Expand All @@ -69,9 +67,7 @@ We also register a consumer to the same topic at `@PostConstruct` time that logs
[source,java]
----
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -80,53 +76,43 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;

import io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub;

@Path("/pubsub")
public class PubSubResource {
private static final Logger LOG = Logger.getLogger(PubSubResource.class);

@ConfigProperty(name = "quarkus.google.cloud.project-id")
String projectId;// Inject the projectId property from application.properties
@Inject
QuarkusPubSub pubSub;

private TopicName topicName;
private Subscriber subscriber;

@Inject
CredentialsProvider credentialsProvider;

@PostConstruct
void init() throws IOException {
// Init topic and subscription, the topic must have been created before
topicName = TopicName.of(projectId, "test-topic");
ProjectSubscriptionName subscriptionName = initSubscription();
// init topic and subscription
pubSub.createTopic("test-topic");
pubSub.createSubscription("test-topic", "test-subscription");

// Subscribe to PubSub
MessageReceiver receiver = (message, consumer) -> {
LOG.infov("Got message {0}", message.getData().toStringUtf8());
consumer.ack();
};
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber = pubSub.subscriber("test-subscription", receiver);
subscriber.startAsync().awaitRunning();
}

Expand All @@ -142,9 +128,8 @@ public class PubSubResource {
@Produces(MediaType.TEXT_PLAIN)
public void pubsub() throws IOException, InterruptedException {
// Init a publisher to the topic
Publisher publisher = Publisher.newBuilder(topicName)
.setCredentialsProvider(credentialsProvider)
.build();
Publisher publisher = pubSub.publisher("test-topic");

try {
ByteString data = ByteString.copyFromUtf8("my-message");// Create a new message
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
Expand All @@ -163,25 +148,6 @@ public class PubSubResource {
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}

private ProjectSubscriptionName initSubscription() throws IOException {
// List all existing subscriptions and create the 'test-subscription' if needed
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, "test-subscription");
SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider)
.build();
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) {
Iterable<Subscription> subscriptions = subscriptionAdminClient.listSubscriptions(ProjectName.of(projectId))
.iterateAll();
Optional<Subscription> existing = StreamSupport.stream(subscriptions.spliterator(), false)
.filter(sub -> sub.getName().equals(subscriptionName.toString()))
.findFirst();
if (existing.isEmpty()) {
subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
}
}
return subscriptionName;
}
}
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,31 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;

import io.quarkiverse.googlecloudservices.it.pubsub.TopicManager;
import io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub;

@Path("/pubsub")
public class PubSubResource {
private static final Logger LOG = Logger.getLogger(PubSubResource.class);

@Inject
TopicManager topicManager;
QuarkusPubSub pubSub;

private Subscriber subscriber;
private String lastMessage;

@PostConstruct
void init() throws IOException {
// init topic and subscription
pubSub.createTopic("test-topic");
pubSub.createSubscription("test-topic", "test-subscription");

// subscribe to PubSub
MessageReceiver receiver = (message, consumer) -> {
this.lastMessage = message.getData().toStringUtf8();
LOG.infov("Got message {0}", this.lastMessage);
consumer.ack();
};
subscriber = topicManager.initSubscriber(receiver);
subscriber = pubSub.subscriber("test-subscription", receiver);
subscriber.startAsync().awaitRunning();
}

Expand All @@ -55,12 +59,12 @@ void destroy() {
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void sendMessage(String message) throws IOException, InterruptedException {
Publisher publisher = topicManager.initPublisher();
Publisher publisher = pubSub.publisher("test-topic");
try {
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<>() {
public void onSuccess(String messageId) {
LOG.infov("published with message id {0}", messageId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
%test.bigtable.authenticated=false

# Use pubsub emulator
%test.pubsub.use-emulator=true
%test.quarkus.google.cloud.pubsub.devservice.enabled=true
%test.quarkus.google.cloud.firestore.devservice.enabled=true
%test.quarkus.google.cloud.bigtable.devservice.enabled=true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkiverse.googlecloudservices.pubsub.deployment;

import io.quarkiverse.googlecloudservices.pubsub.QuarkusPubSub;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;

Expand All @@ -10,4 +12,9 @@ public class PubSubBuildSteps {
public FeatureBuildItem feature() {
return new FeatureBuildItem(FEATURE);
}

@BuildStep
public AdditionalBeanBuildItem producer() {
return new AdditionalBeanBuildItem(QuarkusPubSub.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.quarkiverse.googlecloudservices.pubsub;

import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(name = "google.cloud.pubsub", phase = ConfigPhase.RUN_TIME)
public class PubSubConfiguration {
/**
* Enable emulator and set its host.
*/
@ConfigItem
public Optional<String> emulatorHost;

}
Loading
Loading