diff --git a/build.sbt b/build.sbt index 5c756d92f2..987aa1cf3f 100644 --- a/build.sbt +++ b/build.sbt @@ -1720,6 +1720,7 @@ lazy val integration = project unusedCompileDependenciesTest := unusedCompileDependenciesTestSkipped.value, libraryDependencies ++= Seq( // compile + "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion, "com.google.api-client" % "google-api-client" % gcpBom.key.value, "com.google.apis" % "google-api-services-bigquery" % googleApiServicesBigQueryVersion, "com.google.guava" % "guava" % guavaVersion, diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala index 4035d4e438..a81d5c5231 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala @@ -48,7 +48,8 @@ object TypedBigQueryIT { timestamp: Instant, date: LocalDate, time: LocalTime, - datetime: LocalDateTime, + // BQ DATETIME is problematic with avro: export as 'string(datetime)', load as '(long)local-timestamp-micros' + // datetime: LocalDateTime, geography: Geography, json: Json, bigNumeric: BigNumeric @@ -116,19 +117,31 @@ object TypedBigQueryIT { class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { import TypedBigQueryIT._ + private val bq = BigQuery.defaultInstance() + override protected def afterAll(): Unit = { - val bq = BigQuery.defaultInstance() bq.tables.delete(typedTable.ref) bq.tables.delete(tableRowTable.ref) bq.tables.delete(avroTable.ref) } + def waitForTable(table: Table.Spec): Unit = { + var retries = 0 + while (!bq.tables.exists(table.ref) && retries < 3) { + Thread.sleep(500) + retries += 1 + } + if (retries >= 3) throw new RuntimeException(s"Table $table not found") + } + "TypedBigQuery" should "handle records as TableRow" in { runWithRealContext(options) { sc => sc.parallelize(records) .saveAsTypedBigQueryTable(typedTable, createDisposition = CREATE_IF_NEEDED) }.waitUntilFinish() + waitForTable(typedTable) + runWithRealContext(options) { sc => val data = sc.typedBigQuery[Record](typedTable) data should containInAnyOrder(records) @@ -146,6 +159,8 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { ) }.waitUntilFinish() + waitForTable(tableRowTable) + runWithRealContext(options) { sc => val data = sc.bigQueryTable(tableRowTable).map(Record.fromTableRow) data should containInAnyOrder(records) @@ -154,6 +169,7 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { it should "handle records as avro format" in { implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema) + runWithRealContext(options) { sc => sc.parallelize(records) .map(Record.toAvro) @@ -164,6 +180,8 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { ) }.waitUntilFinish() + waitForTable(avroTable) + runWithRealContext(options) { sc => val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro) data should containInAnyOrder(records) diff --git a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsWrapper.java b/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsWrapper.java deleted file mode 100644 index a1af5d9c08..0000000000 --- a/scio-google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsWrapper.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2019 Spotify AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import java.util.ArrayList; -import java.util.List; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericRecord; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; - -/** - * A set of utilities for working with Avro files. - * - *

These utilities are based on the Avro - * 1.8.1 specification. - */ -public class BigQueryAvroUtilsWrapper { - - /** - * Defines the valid mapping between BigQuery types and native Avro types. - * - *

Some BigQuery types are duplicated here since slightly different Avro records are produced - * when exporting data in Avro format and when reading data directly using the read API. - */ - static final ImmutableMap BIG_QUERY_TO_AVRO_TYPES = - ImmutableMap.builder() - .put("STRING", Type.STRING) - .put("GEOGRAPHY", Type.STRING) - .put("JSON", Type.STRING) - .put("BYTES", Type.BYTES) - .put("INTEGER", Type.LONG) - .put("FLOAT", Type.DOUBLE) - .put("NUMERIC", Type.BYTES) - .put("BOOLEAN", Type.BOOLEAN) - .put("TIMESTAMP", Type.LONG) - .put("RECORD", Type.RECORD) - .put("DATE", Type.INT) - .put("DATETIME", Type.STRING) - .put("TIME", Type.LONG) - .build(); - - public static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) { - return BigQueryAvroUtils.convertGenericRecordToTableRow(record, schema); - } - - public static Schema toGenericAvroSchema(String schemaName, List fieldSchemas) { - List avroFields = new ArrayList<>(); - for (TableFieldSchema bigQueryField : fieldSchemas) { - avroFields.add(convertField(bigQueryField)); - } - return Schema.createRecord( - schemaName, - "Translated Avro Schema for " + schemaName, - "org.apache.beam.sdk.io.gcp.bigquery", - false, - avroFields); - } - - private static Field convertField(TableFieldSchema bigQueryField) { - Type avroType = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()); - if (avroType == null) { - throw new IllegalArgumentException( - "Unable to map BigQuery field type " + bigQueryField.getType() + " to avro type."); - } - - Schema elementSchema; - if (avroType == Type.RECORD) { - elementSchema = toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields()); - } else { - elementSchema = Schema.create(avroType); - if (bigQueryField.getType().equals("DATE")) { - elementSchema = LogicalTypes.date().addToSchema(elementSchema); - } - if (bigQueryField.getType().equals("TIME")) { - elementSchema = LogicalTypes.timeMicros().addToSchema(elementSchema); - } - if (bigQueryField.getType().equals("DATETIME") && avroType != Type.STRING) { - throw new IllegalArgumentException("DateTime type is not supported"); - } - } - Schema fieldSchema; - if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) { - fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema); - } else if ("REQUIRED".equals(bigQueryField.getMode())) { - fieldSchema = elementSchema; - } else if ("REPEATED".equals(bigQueryField.getMode())) { - fieldSchema = Schema.createArray(elementSchema); - } else { - throw new IllegalArgumentException( - String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode())); - } - return new Field( - bigQueryField.getName(), - fieldSchema, - bigQueryField.getDescription(), - (Object) null /* Cast to avoid deprecated JsonNode constructor. */); - } -} diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala index 3cc3f6ab03..c767e0d60b 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala @@ -423,8 +423,8 @@ object BigQueryTypedTable { val writer = beam.BigQueryIO .write[T]() .withFormatFunction(Functions.serializableFn(wFn)) - val fn: (GenericRecord, TableSchema) => T = (gr, ts) => - tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr, ts)) + val fn: (GenericRecord, TableSchema) => T = (gr, _) => + tableRowFn(BigQueryUtils.convertGenericRecordToTableRow(gr)) BigQueryTypedTable(reader, writer, table, fn) } @@ -437,13 +437,15 @@ object BigQueryTypedTable { ): BigQueryTypedTable[T] = { val rFn = ClosureCleaner.clean(readerFn) val wFn = ClosureCleaner.clean(writerFn) - val reader = beam.BigQueryIO.read(rFn(_)) + val reader = beam.BigQueryIO + .read(rFn(_)) + .useAvroLogicalTypes() val writer = beam.BigQueryIO .write[T]() .useAvroLogicalTypes() .withAvroFormatFunction(input => wFn(input.getElement())) .withAvroSchemaFactory { ts => - BigQueryAvroUtilsWrapper.toGenericAvroSchema("root", ts.getFields()) + BigQueryUtils.toGenericAvroSchema("root", ts.getFields(), true) } BigQueryTypedTable(reader, writer, table, fn) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala index 47818e3bba..43f2ef748d 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala @@ -176,10 +176,13 @@ object Timestamp { case t: Long => new Instant(t / 1000) case _ => parse(timestamp.toString) } + + def micros(timestamp: Instant): Long = timestamp.getMillis * 1000 } /** Utility for BigQuery `DATE` type. */ object Date { + private val EpochDate = new LocalDate(1970, 1, 1) // YYYY-[M]M-[D]D private[this] val Formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC() @@ -195,6 +198,8 @@ object Date { case d: Int => new LocalDate(0, DateTimeZone.UTC).plusDays(d) case _ => parse(date.toString) } + + def days(date: LocalDate): Int = Days.daysBetween(EpochDate, date).getDays } /** Utility for BigQuery `TIME` type. */ @@ -219,6 +224,8 @@ object Time { case t: Long => new LocalTime(t / 1000, DateTimeZone.UTC) case _ => parse(time.toString) } + + def micros(time: LocalTime): Long = time.millisOfDay().get().toLong * 1000 } /** Utility for BigQuery `DATETIME` type. */ @@ -324,4 +331,7 @@ object Numeric { case b: ByteBuffer => DecimalConverter.fromBytes(b, null, DecimalLogicalType) case _ => apply(value.toString) } + + def bytes(value: BigDecimal): ByteBuffer = + DecimalConverter.toBytes(value.bigDecimal, null, DecimalLogicalType) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala index 5470947a42..ac70db1b45 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala @@ -29,7 +29,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.avro.io.{BinaryDecoder, DecoderFactory} import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition} -import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryAvroUtilsWrapper, BigQueryOptions} +import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryOptions, BigQueryUtils} import org.apache.beam.sdk.io.gcp.{bigquery => bq} import org.apache.beam.sdk.options.{ExecutorOptions, PipelineOptionsFactory} import org.joda.time.Instant @@ -67,7 +67,7 @@ final private[client] class TableOps(client: Client) { withBigQueryService { bqServices => val tb = bqServices.getTable(table.ref, readOptions.getSelectedFieldsList) storageAvroRows(table, readOptions).map { gr => - BigQueryAvroUtilsWrapper.convertGenericRecordToTableRow(gr, tb.getSchema) + BigQueryUtils.convertGenericRecordToTableRow(gr, tb.getSchema) } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala index 4f00d823ba..2a3ce84045 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala @@ -180,17 +180,18 @@ private[types] object ConverterProvider { case t if t =:= typeOf[String] => tree case t if t =:= typeOf[BigDecimal] => - q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString" + q"_root_.com.spotify.scio.bigquery.Numeric.bytes($tree)" case t if t =:= typeOf[ByteString] => q"_root_.java.nio.ByteBuffer.wrap($tree.toByteArray)" case t if t =:= typeOf[Array[Byte]] => q"_root_.java.nio.ByteBuffer.wrap($tree)" - case t if t =:= typeOf[Instant] => q"$tree.getMillis * 1000" + case t if t =:= typeOf[Instant] => + q"_root_.com.spotify.scio.bigquery.Timestamp.micros($tree)" case t if t =:= typeOf[LocalDate] => - q"_root_.com.spotify.scio.bigquery.Date($tree)" + q"_root_.com.spotify.scio.bigquery.Date.days($tree)" case t if t =:= typeOf[LocalTime] => - q"_root_.com.spotify.scio.bigquery.Time($tree)" + q"_root_.com.spotify.scio.bigquery.Time.micros($tree)" case t if t =:= typeOf[LocalDateTime] => q"_root_.com.spotify.scio.bigquery.DateTime($tree)" @@ -200,7 +201,7 @@ private[types] object ConverterProvider { case t if t =:= typeOf[Json] => q"$tree.wkt" case t if t =:= typeOf[BigNumeric] => - q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString" + q"_root_.com.spotify.scio.bigquery.types.BigNumeric.bytes($tree)" // nested records case t if isCaseClass(c)(t) => diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala index b150b8a7c4..620e160731 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala @@ -37,7 +37,7 @@ private[types] object SchemaProvider { def avroSchemaOf[T: TypeTag]: Schema = AvroSchemaCache.get( typeTag[T].tpe.toString, - BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields) + BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields, true) ) def schemaOf[T: TypeTag]: TableSchema = diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala index 0dda1dbb25..23e8d0d5dd 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala @@ -110,5 +110,8 @@ package object types { case b: ByteBuffer => new BigNumeric(DecimalConverter.fromBytes(b, null, DecimalLogicalType)) case _ => apply(value.toString) } + + def bytes(value: BigNumeric): ByteBuffer = + DecimalConverter.toBytes(value.wkt.bigDecimal, null, DecimalLogicalType) } }