Skip to content

Commit

Permalink
Merge pull request #63 from tanakaryo/feat-how-to-bq
Browse files Browse the repository at this point in the history
#62 add new resource.
  • Loading branch information
tanakaryo authored Sep 7, 2024
2 parents 377c366 + 947d36b commit 9c16ffc
Show file tree
Hide file tree
Showing 24 changed files with 585 additions and 0 deletions.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "java",
"name": "BqReadInsert",
"request": "launch",
"mainClass": "example.BqReadInsert",
"projectName": "bigquery-example"
},
{
"type": "java",
"name": "BqReadWriteBucket",
"request": "launch",
"mainClass": "example.BqReadWriteBucket",
"projectName": "bigquery-example"
},
{
"type": "java",
"name": "WriteMain",
"request": "launch",
"mainClass": "example.WriteMain",
"projectName": "bigquery-example"
},
{
"type": "java",
"name": "Current File",
"request": "launch",
"mainClass": "${file}"
},
{
"type": "java",
"name": "App",
"request": "launch",
"mainClass": "example.App",
"vmArgs": "--add-opens=java.base/java.nio=ALL-UNNAMED",
"projectName": "bigquery-example"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"java.configuration.updateBuildConfiguration": "interactive",
"java.compile.nullAnalysis.mode": "automatic"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>example</groupId>
<artifactId>bigquery-example</artifactId>
<version>1.0-SNAPSHOT</version>

<name>bigquery-example</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>

<properties>
<java.version>17</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.release>17</maven.compiler.release>
<arrow.version>15.0.2</arrow.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.44.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.41.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-bom</artifactId>
<version>${arrow.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.42.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud.opentelemetry</groupId>
<artifactId>exporter-metrics</artifactId>
<version>0.31.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M6</version>
<configuration>
<argLine>--add-opens=java.base/java.nio=ALL-UNNAMED</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package example;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;


public class App {

private static class SimpleRowReader implements AutoCloseable {

BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

// Decoder object will be reused to avoid re-allocation and too much garbage collection.
private final VectorSchemaRoot root;
private final VectorLoader loader;

public SimpleRowReader(ArrowSchema arrowSchema) throws IOException {
Schema schema =
MessageSerializer.deserializeSchema(
new ReadChannel(
new ByteArrayReadableSeekableByteChannel(
arrowSchema.getSerializedSchema().toByteArray())));
Preconditions.checkNotNull(schema);
List<FieldVector> vectors = new ArrayList<>();
for (Field field : schema.getFields()) {
vectors.add(field.createVector(allocator));
}
root = new VectorSchemaRoot(vectors);
loader = new VectorLoader(root);
}

/**
* Sample method for processing Arrow data which only validates decoding.
*
* @param batch object returned from the ReadRowsResponse.
*/
public void processRows(ArrowRecordBatch batch) throws IOException {
org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
MessageSerializer.deserializeRecordBatch(
new ReadChannel(
new ByteArrayReadableSeekableByteChannel(
batch.getSerializedRecordBatch().toByteArray())),
allocator);

loader.load(deserializedBatch);
// Release buffers from batch (they are still held in the vectors in root).
deserializedBatch.close();
System.out.println(root.contentToTSVString());
// Release buffers from vectors in root.
root.clear();
}

@Override
public void close() {
root.close();
allocator.close();
}
}
public static void main(String[] args) throws Exception {
String projectId = "aspf-jp-test";
Integer snapshotMillis = null;
if (args.length > 1) {
snapshotMillis = Integer.parseInt(args[1]);
}

try (BigQueryReadClient client = BigQueryReadClient.create()) {
String parent = String.format("projects/%s", projectId);

// This example uses baby name data from the public datasets.
String srcTable =
String.format(
"projects/%s/datasets/%s/tables/%s",
"aspf-jp-test", "my_dataset", "test_table1");

// We specify the columns to be projected by adding them to the selected fields,
// and set a simple filter to restrict which rows are transmitted.
List<String> columnLst = new ArrayList<>();
columnLst.add("record_num");
columnLst.add("name");
columnLst.add("age");
columnLst.add("address");
TableReadOptions options =
TableReadOptions.newBuilder()
.addAllSelectedFields(columnLst)
.setRowRestriction(" record_num BETWEEN 1 AND 50 ")
.build();

// Start specifying the read session we want created.
ReadSession.Builder sessionBuilder =
ReadSession.newBuilder()
.setTable(srcTable)
// This API can also deliver data serialized in Apache Avro format.
// This example leverages Apache Arrow.
.setDataFormat(DataFormat.ARROW)
.setReadOptions(options);

// Optionally specify the snapshot time. When unspecified, snapshot time is "now".
if (snapshotMillis != null) {
Timestamp t =
Timestamp.newBuilder()
.setSeconds(snapshotMillis / 1000)
.setNanos((int) ((snapshotMillis % 1000) * 1000000))
.build();
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build();
sessionBuilder.setTableModifiers(modifiers);
}

// Begin building the session creation request.
CreateReadSessionRequest.Builder builder =
CreateReadSessionRequest.newBuilder()
.setParent(parent)
.setReadSession(sessionBuilder)
.setMaxStreamCount(1);

ReadSession session = client.createReadSession(builder.build());
// Setup a simple reader and start a read session.
try (SimpleRowReader reader = new SimpleRowReader(session.getArrowSchema())) {

// Assert that there are streams available in the session. An empty table may not have
// data available. If no sessions are available for an anonymous (cached) table, consider
// writing results of a query to a named table rather than consuming cached results
// directly.
Preconditions.checkState(session.getStreamsCount() > 0);

// Use the first stream to perform reading.
String streamName = session.getStreams(0).getName();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(streamName).build();

// Process each block of rows as they arrive and decode using our simple row reader.
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
reader.processRows(response.getArrowRecordBatch());
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package example;

import java.util.UUID;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;

public class BqReadInsert {
public static void main(String[] args) throws Exception {
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"INSERT INTO `test_dataset.for_ins_table` SELECT * FROM `my_dataset_listing.test_table1`"
).setUseLegacySql(false)
.build();

JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

queryJob = queryJob.waitFor();

if (queryJob == null) {
throw new RuntimeException("Job is failed.");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException("Error :" + queryJob.getStatus().getError().toString());
}
System.out.println("SUCCESS");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package example;

import java.util.UUID;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;

public class BqReadWriteBucket {
public static void main(String[] args) throws Exception {
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"SELECT * FROM my_dataset_listing.test_table1 ORDER BY record_num ASC"
).setUseLegacySql(false)
.build();

JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

queryJob = queryJob.waitFor();

if (queryJob == null) {
throw new RuntimeException("Job is failed.");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException("Error :" + queryJob.getStatus().getError().toString());
}

TableResult result = queryJob.getQueryResults();

for (FieldValueList row : result.iterateAll()) {
System.out.println(row.get("name").getStringValue());
}
}
}
Loading

0 comments on commit 9c16ffc

Please sign in to comment.