Skip to content

Commit

Permalink
Merge pull request #223 from loicmathieu/feat/pubsub-emulator
Browse files Browse the repository at this point in the history
Feat/pubsub emulator
  • Loading branch information
loicmathieu authored Dec 28, 2021
2 parents 90868d8 + 6e099d2 commit 00255f7
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 75 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import org.mockito.Mockito;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;

import io.quarkus.test.Mock;

Expand All @@ -87,8 +87,7 @@ public class GoogleCredentialsMockProducer {
@Singleton
@Default
public CredentialsProvider credentialsProvider() {
GoogleCredentials credentials = Mockito.mock(GoogleCredentials.class);
return FixedCredentialsProvider.create(credentials);
return NoCredentialsProvider.create();
}
}
```
Expand Down
5 changes: 2 additions & 3 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import org.mockito.Mockito;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import io.quarkus.test.Mock;
Expand All @@ -84,8 +84,7 @@ public class GoogleCredentialsMockProducer {
@Singleton
@Default
public CredentialsProvider credentialsProvider() {
GoogleCredentials credentials = Mockito.mock(GoogleCredentials.class);
return FixedCredentialsProvider.create(credentials);
return NoCredentialsProvider.create();
}
}
----
Expand Down
4 changes: 4 additions & 0 deletions integration-tests/main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@
<version>1.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,68 +1,47 @@
package io.quarkiverse.googlecloudservices.it;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;

import io.quarkiverse.googlecloudservices.it.pubsub.TopicManager;

@Path("/pubsub")
public class PubSubResource {
private static final Logger LOG = Logger.getLogger(PubSubResource.class);

@ConfigProperty(name = "quarkus.google.cloud.project-id")
String projectId;

@Inject
CredentialsProvider credentialsProvider;
TopicManager topicManager;

private TopicName topicName;
private Subscriber subscriber;
private String lastMessage;

@PostConstruct
void init() throws IOException {
topicName = TopicName.of(projectId, "test-topic");
ProjectSubscriptionName subscriptionName = initTopicAndSubscription();

// subscribe to PubSub
MessageReceiver receiver = (message, consumer) -> {
LOG.infov("Got message {0}", message.getData().toStringUtf8());
this.lastMessage = message.getData().toStringUtf8();
LOG.infov("Got message {0}", this.lastMessage);
consumer.ack();
};
subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setCredentialsProvider(credentialsProvider)
.build();
subscriber = topicManager.initSubscriber(receiver);
subscriber.startAsync().awaitRunning();
}

Expand All @@ -73,14 +52,12 @@ void destroy() {
}
}

@GET
@Produces(MediaType.TEXT_PLAIN)
public void pubsub() throws IOException, InterruptedException {
Publisher publisher = Publisher.newBuilder(topicName)
.setCredentialsProvider(credentialsProvider)
.build();
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void sendMessage(String message) throws IOException, InterruptedException {
Publisher publisher = topicManager.initPublisher();
try {
ByteString data = ByteString.copyFromUtf8("my-message");
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
Expand All @@ -98,34 +75,9 @@ public void onFailure(Throwable t) {
}
}

private ProjectSubscriptionName initTopicAndSubscription() throws IOException {
TopicAdminSettings topicAdminSettings = TopicAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider)
.build();
try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) {
Iterable<Topic> topics = topicAdminClient.listTopics(ProjectName.of(projectId)).iterateAll();
Optional<Topic> existing = StreamSupport.stream(topics.spliterator(), false)
.filter(topic -> topic.getName().equals(topicName.toString()))
.findFirst();
if (!existing.isPresent()) {
topicAdminClient.createTopic(topicName.toString());
}
}

ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, "test-subscription");
SubscriptionAdminSettings subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider)
.build();
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) {
Iterable<Subscription> subscriptions = subscriptionAdminClient.listSubscriptions(ProjectName.of(projectId))
.iterateAll();
Optional<Subscription> existing = StreamSupport.stream(subscriptions.spliterator(), false)
.filter(sub -> sub.getName().equals(subscriptionName.toString()))
.findFirst();
if (!existing.isPresent()) {
subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
}
}
return subscriptionName;
@GET
@Produces(MediaType.TEXT_PLAIN)
public String verifyMessage() {
return this.lastMessage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package io.quarkiverse.googlecloudservices.it.pubsub;

import java.io.IOException;
import java.util.Optional;
import java.util.stream.StreamSupport;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.*;
import com.google.pubsub.v1.*;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

@ApplicationScoped
public class TopicManager {
@Inject
CredentialsProvider credentialsProvider;

@ConfigProperty(name = "quarkus.google.cloud.project-id")
String projectId;

@ConfigProperty(name = "pubsub.use-emulator", defaultValue = "false")
boolean useEmulator;

private TopicName topicName;
private Optional<TransportChannelProvider> channelProvider;

@PostConstruct
void init() {
this.topicName = TopicName.of(projectId, "test-topic");

if (useEmulator) {
ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:8085").usePlaintext().build();
channelProvider = Optional.of(FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)));
} else {
channelProvider = Optional.empty();
}
}

public Subscriber initSubscriber(MessageReceiver receiver) throws IOException {
ProjectSubscriptionName subscriptionName = initTopicAndSubscription();
var builder = Subscriber.newBuilder(subscriptionName, receiver)
.setCredentialsProvider(credentialsProvider);
channelProvider.ifPresent(builder::setChannelProvider);
return builder.build();
}

public Publisher initPublisher() throws IOException {
var builder = Publisher.newBuilder(topicName)
.setCredentialsProvider(credentialsProvider);
channelProvider.ifPresent(builder::setChannelProvider);
return builder.build();
}

private ProjectSubscriptionName initTopicAndSubscription() throws IOException {
TopicAdminSettings topicAdminSettings = getTopicAdminSettings();
try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) {
Iterable<Topic> topics = topicAdminClient.listTopics(ProjectName.of(projectId)).iterateAll();
Optional<Topic> existing = StreamSupport.stream(topics.spliterator(), false)
.filter(topic -> topic.getName().equals(topicName.toString()))
.findFirst();
if (existing.isEmpty()) {
topicAdminClient.createTopic(topicName.toString());
}
}

ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, "test-subscription");
SubscriptionAdminSettings subscriptionAdminSettings = getSubscriptionAdminSettings();
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) {
Iterable<Subscription> subscriptions = subscriptionAdminClient.listSubscriptions(ProjectName.of(projectId))
.iterateAll();
Optional<Subscription> existing = StreamSupport.stream(subscriptions.spliterator(), false)
.filter(sub -> sub.getName().equals(subscriptionName.toString()))
.findFirst();
if (existing.isEmpty()) {
subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
}
}
return subscriptionName;
}

private SubscriptionAdminSettings getSubscriptionAdminSettings() throws IOException {
var builder = SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider);
channelProvider.ifPresent(builder::setTransportChannelProvider);
return builder.build();
}

private TopicAdminSettings getTopicAdminSettings() throws IOException {
var builder = TopicAdminSettings.newBuilder()
.setCredentialsProvider(credentialsProvider);
channelProvider.ifPresent(builder::setTransportChannelProvider);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
# Disable authentication for Bigtable on tests
%test.bigtable.authenticated=false

# Use pubsub emulator
%test.pubsub.use-emulator=true

# Secret Manager Demo
# You can load secrets from Google Cloud Secret Manager with the ${sm//<SECRET_ID>} syntax.
my.database.password=${sm//integration-test}
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package io.quarkiverse.googlecloudservices.it;

import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;

import java.util.concurrent.TimeUnit;

import org.hamcrest.core.IsEqual;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;

@QuarkusTest
@EnabledIfSystemProperty(named = "gcloud.test", matches = "true")
Expand Down Expand Up @@ -38,10 +43,20 @@ public void testFirestore() {

@Test
public void testPubSub() {
String message = "Hello Pub/Sub";
given()
.when().get("/pubsub")
.body(message)
.contentType(ContentType.TEXT)
.when().post("/pubsub")
.then()
.statusCode(204);

await().atMost(3, TimeUnit.SECONDS)
.untilAsserted(() -> given()
.when().get("/pubsub")
.then()
.statusCode(200)
.body(IsEqual.equalTo(message)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.quarkiverse.googlecloudservices.it;

import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.core.IsEqual.equalTo;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PubSubEmulatorContainer;
import org.testcontainers.utility.DockerImageName;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;

@QuarkusTest
public class PubSubResourceTest {
private static final PubSubEmulatorContainer EMULATOR = new PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk"));

@BeforeAll
public static void startGcloudContainer() {
List<String> portBindings = new ArrayList<>();
portBindings.add("8085:8085");
EMULATOR.setPortBindings(portBindings);
EMULATOR.start();
}

@AfterAll
public static void stopGcloudContainer() {
EMULATOR.stop();
}

@Test
public void testPubSub() throws ExecutionException, InterruptedException, TimeoutException {
String message = "Hello Pub/Sub";
given()
.body(message)
.contentType(ContentType.TEXT)
.when().post("/pubsub")
.then()
.statusCode(204);

await().atMost(3, TimeUnit.SECONDS)
.untilAsserted(() -> given()
.when().get("/pubsub")
.then()
.statusCode(200)
.body(equalTo(message)));
}
}
Loading

0 comments on commit 00255f7

Please sign in to comment.