From 67f158bd11d63758e313438537739a0ff677b841 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Fri, 12 Jan 2024 14:31:41 +0100 Subject: [PATCH] Produce a BigQueryWriteClient Fixes #194 --- .../bigquery/runtime/BigQueryProducer.java | 20 +++++++- docs/modules/ROOT/pages/bigquery.adoc | 27 ++++++++++- .../it/BigQueryResource.java | 47 ++++++++++++++++++- 3 files changed, 91 insertions(+), 3 deletions(-) diff --git a/bigquery/runtime/src/main/java/io/quarkiverse/googlecloudservices/bigquery/runtime/BigQueryProducer.java b/bigquery/runtime/src/main/java/io/quarkiverse/googlecloudservices/bigquery/runtime/BigQueryProducer.java index 85050c4b..cb4f2722 100644 --- a/bigquery/runtime/src/main/java/io/quarkiverse/googlecloudservices/bigquery/runtime/BigQueryProducer.java +++ b/bigquery/runtime/src/main/java/io/quarkiverse/googlecloudservices/bigquery/runtime/BigQueryProducer.java @@ -8,9 +8,12 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; +import com.google.api.gax.core.CredentialsProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; import io.quarkiverse.googlecloudservices.common.GcpBootstrapConfiguration; import io.quarkiverse.googlecloudservices.common.GcpConfigHolder; @@ -21,17 +24,32 @@ public class BigQueryProducer { @Inject GoogleCredentials googleCredentials; + @Inject + CredentialsProvider credentialsProvider; + @Inject GcpConfigHolder gcpConfigHolder; @Produces @Singleton @Default - public BigQuery bigQuery() throws IOException { + public BigQuery bigQuery() { GcpBootstrapConfiguration gcpConfiguration = gcpConfigHolder.getBootstrapConfig(); return BigQueryOptions.newBuilder().setCredentials(googleCredentials) .setProjectId(gcpConfiguration.projectId().orElse(null)) .build() .getService(); } + + @Produces + @Singleton + @Default + public BigQueryWriteClient bigQueryWriteClient() throws IOException { + GcpBootstrapConfiguration gcpConfiguration = gcpConfigHolder.getBootstrapConfig(); + BigQueryWriteSettings bigQueryWriteSettings = BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(credentialsProvider) + .setQuotaProjectId(gcpConfiguration.projectId().orElse(null)) + .build(); + return BigQueryWriteClient.create(bigQueryWriteSettings); + } } diff --git a/docs/modules/ROOT/pages/bigquery.adoc b/docs/modules/ROOT/pages/bigquery.adoc index be6a8e6a..1f109c46 100644 --- a/docs/modules/ROOT/pages/bigquery.adoc +++ b/docs/modules/ROOT/pages/bigquery.adoc @@ -1,6 +1,6 @@ = Google Cloud Services - BigQuery -This extension allows to inject a `com.google.cloud.bigquery.BigQuery` object inside your Quarkus application. +This extension allows to inject a `com.google.cloud.bigquery.BigQuery` object or a `com.google.cloud.bigquery.storage.v1.BigQueryWriteClient` object inside your Quarkus application. Be sure to have read the https://quarkiverse.github.io/quarkiverse-docs/quarkus-google-cloud-services/main/index.html[Google Cloud Services extension pack global documentation] before this one, it contains general configuration and information. @@ -97,3 +97,28 @@ public class BigQueryResource { } } ---- + +If you want to use the BigQuery Storage Write API you can inject a `BigQueryWriteClient` : + +[source, java] +---- +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; + +@Path("/bigquery") +public class BigQueryResource { + @Inject + BigQueryWriteClient bigQueryWriteClient; + + @GET + @Produces(MediaType.TEXT_PLAIN) + public String bigquery() { + // do whatever you want with the BigQueryWriteClient ... + } +} +---- \ No newline at end of file diff --git a/integration-tests/main/src/main/java/io/quarkiverse/googlecloudservices/it/BigQueryResource.java b/integration-tests/main/src/main/java/io/quarkiverse/googlecloudservices/it/BigQueryResource.java index 4f5b47f9..b14738d3 100644 --- a/integration-tests/main/src/main/java/io/quarkiverse/googlecloudservices/it/BigQueryResource.java +++ b/integration-tests/main/src/main/java/io/quarkiverse/googlecloudservices/it/BigQueryResource.java @@ -1,6 +1,8 @@ package io.quarkiverse.googlecloudservices.it; +import java.io.IOException; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -10,18 +12,30 @@ import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.json.JSONArray; +import org.json.JSONObject; + import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableResult; +import com.google.cloud.bigquery.storage.v1.*; +import com.google.protobuf.Descriptors; @Path("/bigquery") public class BigQueryResource { @Inject BigQuery bigquery; + @Inject + BigQueryWriteClient bigQueryWriteClient; + + @ConfigProperty(name = "quarkus.google.cloud.project-id") + String projectId; + @GET @Produces(MediaType.TEXT_PLAIN) public String bigquery() throws InterruptedException { @@ -53,4 +67,35 @@ public String bigquery() throws InterruptedException { .map(row -> row.get("url").getStringValue() + " - " + row.get("view_count").getLongValue() + "\n") .collect(Collectors.joining()); } -} \ No newline at end of file + + @GET + @Path("/writeClient") + @Produces(MediaType.TEXT_PLAIN) + public String bigQueryWriteClient() + throws InterruptedException, Descriptors.DescriptorValidationException, IOException, ExecutionException { + TableName parentTable = TableName.of(projectId, "testdataset", "testtable"); + WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build(); + CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder() + .setParent(parentTable.toString()) + .setWriteStream(stream) + .build(); + WriteStream writeStream = bigQueryWriteClient.createWriteStream(createWriteStreamRequest); + try (JsonStreamWriter streamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .build()) { + long offset = 0; + for (int i = 0; i < 2; i++) { + // Create a JSON object that is compatible with the table schema. + JSONArray jsonArr = new JSONArray(); + for (int j = 0; j < 10; j++) { + JSONObject record = new JSONObject(); + record.put("col1", String.format("batch-record %03d-%03d", i, j)); + jsonArr.put(record); + } + streamWriter.append(jsonArr, offset).get(); + offset += jsonArr.length(); + } + } + + return "OK"; + } +}