diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala index b0e6dd9dbb..74cba90165 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala @@ -24,7 +24,7 @@ import com.spotify.scio.bigquery.BigQueryTypedTable.Format import com.spotify.scio.bigquery.client.BigQuery import com.spotify.scio.testing._ import magnolify.scalacheck.auto._ -import org.apache.avro.{LogicalTypes, Schema} +import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder} import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.beam.sdk.options.PipelineOptionsFactory import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime} @@ -166,28 +166,17 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll { it should "write GenericRecord records with logical types" in { val sc = ScioContext(options) - import scala.jdk.CollectionConverters._ - val schema: Schema = Schema.createRecord( - "Record", - "", - "com.spotify.scio.bigquery", - false, - List( - new Schema.Field( - "date", - LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)), - "", - 0 - ), - new Schema.Field( - "time", - LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)), - "", - 0L - ), - new Schema.Field("datetime", Schema.create(Schema.Type.STRING), "", "") - ).asJava - ) + // format: off + val schema: Schema = SchemaBuilder + .record("Record") + .namespace("com.spotify.scio.bigquery") + .fields() + .name("date").`type`(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))).withDefault(0) + .name("time").`type`(LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG))).withDefault(0L) + .name("datetime").`type`().stringType().stringDefault("") + .endRecord() + // format: on + implicit val coder = avroGenericRecordCoder(schema) val ltRecords: Seq[GenericRecord] = Seq( diff --git a/integration/src/test/scala/com/spotify/scio/smb/SortMergeBucketParityIT.scala b/integration/src/test/scala/com/spotify/scio/smb/SortMergeBucketParityIT.scala index 01bb28fa10..8d3c1198c9 100644 --- a/integration/src/test/scala/com/spotify/scio/smb/SortMergeBucketParityIT.scala +++ b/integration/src/test/scala/com/spotify/scio/smb/SortMergeBucketParityIT.scala @@ -26,7 +26,7 @@ import com.spotify.scio.util.MultiJoin import com.spotify.scio.values.SCollection import org.apache.avro.Schema import org.apache.avro.Schema.Field -import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.beam.sdk.extensions.smb.{AvroSortedBucketIO, SortedBucketIO, TargetParallelism} import org.apache.beam.sdk.values.TupleTag import org.apache.commons.io.FileUtils @@ -213,10 +213,10 @@ class SortMergeBucketParityIT extends AnyFlatSpec with Matchers { val outputPaths = (0 until numSources).map { n => val data = (0 to Random.nextInt(100)).map { i => - val gr: GenericRecord = new GenericData.Record(schema) - gr.put("key", i) - gr.put("value", s"v$i") - gr + new GenericRecordBuilder(schema) + .set("key", i) + .set("value", s"v$i") + .build() } val outputPath = new File(tempFolder, s"source$n") diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/dynamic/syntax/AvroDynamicSCollectionSyntax.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/dynamic/syntax/AvroDynamicSCollectionSyntax.scala index 62175d8f7e..bf531e6fdc 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/avro/dynamic/syntax/AvroDynamicSCollectionSyntax.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/dynamic/syntax/AvroDynamicSCollectionSyntax.scala @@ -19,7 +19,7 @@ package com.spotify.scio.avro.dynamic.syntax import com.google.protobuf.Message import com.spotify.scio.avro.AvroIO -import com.spotify.scio.coders.{AvroBytesUtil, Coder, CoderMaterializer} +import com.spotify.scio.coders.{AvroBytesUtil, Coder} import com.spotify.scio.io.{ClosedTap, EmptyTap} import com.spotify.scio.io.dynamic.syntax.DynamicSCollectionOps import com.spotify.scio.protobuf.util.ProtobufUtil @@ -142,22 +142,19 @@ final class DynamicProtobufSCollectionOps[T <: Message](private val self: SColle tempDirectory: String = AvroIO.WriteParam.DefaultTempDirectory, prefix: String = AvroIO.WriteParam.DefaultPrefix )(destinationFn: T => String)(implicit ct: ClassTag[T]): ClosedTap[Nothing] = { - val protoCoder = Coder.protoMessageCoder[T] - val elemCoder = CoderMaterializer.beam(self.context, protoCoder) - val avroSchema = AvroBytesUtil.schema - val nm = new JHashMap[String, AnyRef]() - nm.putAll((metadata ++ ProtobufUtil.schemaMetadataOf(ct)).asJava) - if (self.context.isTest) { throw new NotImplementedError( "Protobuf file with dynamic destinations cannot be used in a test context" ) } else { + implicit val protoCoder: Coder[T] = Coder.protoMessageCoder[T] + val nm = new JHashMap[String, AnyRef]() + nm.putAll((metadata ++ ProtobufUtil.schemaMetadataOf(ct)).asJava) + val sink = BAvroIO - .sinkViaGenericRecords( - avroSchema, - (element: T, _: Schema) => AvroBytesUtil.encode(elemCoder, element) - ) + .sink(AvroBytesUtil.schema) + .asInstanceOf[BAvroIO.Sink[T]] + .withDatumWriterFactory(AvroBytesUtil.datumWriterFactory) .withCodec(codec) .withMetadata(nm) val write = writeDynamic( diff --git a/scio-avro/src/main/scala/com/spotify/scio/coders/AvroBytesUtil.scala b/scio-avro/src/main/scala/com/spotify/scio/coders/AvroBytesUtil.scala index ebca52ca9f..1e86359ba3 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/coders/AvroBytesUtil.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/coders/AvroBytesUtil.scala @@ -17,41 +17,47 @@ package com.spotify.scio.coders -import org.apache.avro.generic.{GenericData, GenericRecord} -import org.apache.avro.{Schema => ASchema} +import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder} +import org.apache.avro.io.{DatumWriter, Encoder} +import org.apache.avro.{Schema, SchemaBuilder} import org.apache.beam.sdk.coders.{Coder => BCoder} +import org.apache.beam.sdk.extensions.avro.io.AvroSink.DatumWriterFactory import org.apache.beam.sdk.util.CoderUtils import java.nio.ByteBuffer -import scala.jdk.CollectionConverters._ private[scio] object AvroBytesUtil { - val schema: ASchema = { - val s = ASchema.createRecord("AvroBytesRecord", null, null, false) - s.setFields( - List( - new ASchema.Field( - "bytes", - ASchema.create(ASchema.Type.BYTES), - null, - null.asInstanceOf[Object] - ) - ).asJava - ) - s + val schema: Schema = SchemaBuilder + .record("AvroBytesRecord") + .fields() + .requiredBytes("bytes") + .endRecord() + + private val byteField = schema.getField("bytes") + + def datumWriterFactory[T: Coder]: DatumWriterFactory[T] = { + val bCoder = CoderMaterializer.beamWithDefault(Coder[T]) + (schema: Schema) => + new DatumWriter[T] { + private val underlying = new GenericDatumWriter[GenericRecord](schema) + + override def setSchema(schema: Schema): Unit = underlying.setSchema(schema) + + override def write(datum: T, out: Encoder): Unit = + underlying.write(AvroBytesUtil.encode(bCoder, datum), out) + } } def encode[T](coder: BCoder[T], obj: T): GenericRecord = { val bytes = CoderUtils.encodeToByteArray(coder, obj) - val record = new GenericData.Record(schema) - record.put("bytes", ByteBuffer.wrap(bytes)) - record + new GenericRecordBuilder(schema) + .set(byteField, ByteBuffer.wrap(bytes)) + .build() } def decode[T](coder: BCoder[T], record: GenericRecord): T = { - val bb = record.get("bytes").asInstanceOf[ByteBuffer] - val bytes = - java.util.Arrays.copyOfRange(bb.array(), bb.position(), bb.limit()) + val bb = record.get(byteField.pos()).asInstanceOf[ByteBuffer] + val bytes = java.util.Arrays.copyOfRange(bb.array(), bb.position(), bb.limit()) CoderUtils.decodeFromByteArray(coder, bytes) } } diff --git a/scio-avro/src/test/scala/com/spotify/scio/avro/types/SchemaUtilTest.scala b/scio-avro/src/test/scala/com/spotify/scio/avro/types/SchemaUtilTest.scala index 1bdc1b3d75..2cb92ede9a 100644 --- a/scio-avro/src/test/scala/com/spotify/scio/avro/types/SchemaUtilTest.scala +++ b/scio-avro/src/test/scala/com/spotify/scio/avro/types/SchemaUtilTest.scala @@ -19,7 +19,8 @@ package com.spotify.scio.avro.types import com.spotify.scio.avro.types.Schemas._ import com.spotify.scio.avro.types.Schemas.FieldMode._ -import org.apache.avro.Schema +import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.beam.model.pipeline.v1.SchemaApi.SchemaOrBuilder import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -149,16 +150,11 @@ class SchemaUtilTest extends AnyFlatSpec with Matchers { val expectedFields = SchemaUtil.scalaReservedWords .map(e => s"`$e`") .mkString(start = "", sep = ": Long, ", end = ": Long") - val schema = - Schema.createRecord( - "Row", - null, - null, - false, - SchemaUtil.scalaReservedWords.map { name => - new Schema.Field(name, Schema.create(Schema.Type.LONG), null, null.asInstanceOf[Any]) - }.asJava - ) + + val schema = SchemaUtil.scalaReservedWords + .foldLeft(SchemaBuilder.record("Row").fields())(_.requiredLong(_)) + .endRecord() + SchemaUtil.toPrettyString1(schema) shouldBe s"case class Row($expectedFields)" } } diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/AvroExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/AvroExample.scala index 25934777dd..dd9c7dcc29 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/AvroExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/AvroExample.scala @@ -27,10 +27,8 @@ import com.spotify.scio._ import com.spotify.scio.avro._ import com.spotify.scio.avro.types.AvroType import com.spotify.scio.io.ClosedTap -import org.apache.avro.Schema -import org.apache.avro.generic.{GenericData, GenericRecord} - -import scala.jdk.CollectionConverters._ +import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} object AvroExample { @AvroType.fromSchema("""{ @@ -107,12 +105,12 @@ object AvroExample { implicit def genericCoder = avroGenericRecordCoder(schema) sc.parallelize(1 to 100) .map[GenericRecord] { i => - val r = new GenericData.Record(schema) - r.put("id", i) - r.put("amount", i.toDouble) - r.put("name", "account" + i) - r.put("type", "checking") - r + new GenericRecordBuilder(schema) + .set("id", i) + .set("amount", i.toDouble) + .set("name", "account" + i) + .set("type", "checking") + .build() } .saveAsAvroFile(args("output"), schema = schema) } @@ -133,24 +131,12 @@ object AvroExample { .map(_.toString) .saveAsTextFile(args("output")) - val schema: Schema = { - def f(name: String, tpe: Schema.Type) = - new Schema.Field( - name, - Schema.createUnion(List(Schema.create(Schema.Type.NULL), Schema.create(tpe)).asJava), - null: String, - null: AnyRef - ) - - val s = Schema.createRecord("GenericAccountRecord", null, null, false) - s.setFields( - List( - f("id", Schema.Type.INT), - f("amount", Schema.Type.DOUBLE), - f("name", Schema.Type.STRING), - f("type", Schema.Type.STRING) - ).asJava - ) - s - } + val schema: Schema = SchemaBuilder + .record("GenericAccountRecord") + .fields() + .requiredInt("id") + .requiredDouble("amount") + .requiredString("name") + .requiredString("type") + .endRecord() } diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala index 4f1c1bfc16..3d0093d163 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala @@ -32,7 +32,7 @@ import com.spotify.scio.coders.Coder import com.spotify.scio.values.SCollection import org.apache.avro.Schema import org.apache.avro.file.CodecFactory -import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.beam.sdk.extensions.smb.BucketMetadata.HashType import org.apache.beam.sdk.extensions.smb.{AvroSortedBucketIO, TargetParallelism} import org.apache.beam.sdk.values.TupleTag @@ -58,13 +58,10 @@ object SortMergeBucketExample { |""".stripMargin ) - def user(id: String, age: Int): GenericRecord = { - val gr = new GenericData.Record(UserDataSchema) - gr.put("userId", id) - gr.put("age", age) - - gr - } + def user(id: String, age: Int): GenericRecord = new GenericRecordBuilder(UserDataSchema) + .set("userId", id) + .set("age", age) + .build() } object SortMergeBucketWriteExample { diff --git a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyAvroExampleTest.scala b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyAvroExampleTest.scala index f5d8cda526..478d8bed46 100644 --- a/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyAvroExampleTest.scala +++ b/scio-examples/src/test/scala/com/spotify/scio/examples/extra/MagnolifyAvroExampleTest.scala @@ -20,7 +20,7 @@ package com.spotify.scio.examples.extra import com.spotify.scio.avro.AvroIO import com.spotify.scio.io._ import com.spotify.scio.testing._ -import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} class MagnolifyAvroExampleTest extends PipelineSpec { import MagnolifyAvroExample._ @@ -28,10 +28,10 @@ class MagnolifyAvroExampleTest extends PipelineSpec { val textIn: Seq[String] = Seq("a b c d e", "a b a b") val wordCount: Seq[(String, Long)] = Seq(("a", 3L), ("b", 3L), ("c", 1L), ("d", 1L), ("e", 1L)) val records: Seq[GenericRecord] = wordCount.map { kv => - val r = new GenericData.Record(wordCountType.schema) - r.put("word", kv._1) - r.put("count", kv._2) - r + new GenericRecordBuilder(wordCountType.schema) + .set("word", kv._1) + .set("count", kv._2) + .build() } val textOut: Seq[String] = wordCount.map(kv => kv._1 + ": " + kv._2) diff --git a/scio-extra/src/test/scala/com/spotify/scio/extra/bigquery/ToTableRowTest.scala b/scio-extra/src/test/scala/com/spotify/scio/extra/bigquery/ToTableRowTest.scala index 69b33d3577..f518974c5d 100644 --- a/scio-extra/src/test/scala/com/spotify/scio/extra/bigquery/ToTableRowTest.scala +++ b/scio-extra/src/test/scala/com/spotify/scio/extra/bigquery/ToTableRowTest.scala @@ -19,10 +19,9 @@ package com.spotify.scio.extra.bigquery import java.math.{BigDecimal => JBigDecimal} import java.nio.ByteBuffer - import com.google.protobuf.ByteString import com.spotify.scio.bigquery.TableRow -import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.apache.avro.generic.GenericData.EnumSymbol import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding import org.joda.time.{DateTime, LocalDate, LocalTime} @@ -72,29 +71,24 @@ class ToTableRowTest extends AnyFlatSpec with Matchers { it should "convert a GenericRecord to TableRow" in { val enumSchema = AvroExample.SCHEMA$.getField("enumField").schema() - val nestedAvro = new GenericData.Record(NestedAvro.SCHEMA$) - nestedAvro.put("nestedField", "nestedValue") + val nestedAvro = new GenericRecordBuilder(NestedAvro.SCHEMA$) + .set("nestedField", "nestedValue") + .build() - val genericRecord = new GenericData.Record(AvroExample.SCHEMA$) - genericRecord.put("booleanField", true) - genericRecord.put("stringField", "someString") - genericRecord.put("doubleField", 1.0) - genericRecord.put("longField", 1L) - genericRecord.put("intField", 1) - genericRecord.put("floatField", 1f) - genericRecord.put( - "bytesField", - ByteBuffer.wrap(ByteString.copyFromUtf8("%20cフーバー").toByteArray) - ) - genericRecord.put("arrayField", List(nestedAvro).asJava) - genericRecord.put("unionField", "someUnion") - genericRecord.put( - "mapField", - Map("mapKey" -> 1.0d).asJava - .asInstanceOf[java.util.Map[java.lang.CharSequence, java.lang.Double]] - ) - genericRecord.put("enumField", new EnumSymbol(enumSchema, Kind.FOO.toString)) - genericRecord.put("fixedField", new fixedType("%20cフーバー".getBytes())) + val genericRecord = new GenericRecordBuilder(AvroExample.SCHEMA$) + .set("booleanField", true) + .set("stringField", "someString") + .set("doubleField", 1.0) + .set("longField", 1L) + .set("intField", 1) + .set("floatField", 1f) + .set("bytesField", ByteBuffer.wrap(ByteString.copyFromUtf8("%20cフーバー").toByteArray)) + .set("arrayField", List(nestedAvro).asJava) + .set("unionField", "someUnion") + .set("mapField", Map[CharSequence, java.lang.Double]("mapKey" -> 1.0d).asJava) + .set("enumField", new EnumSymbol(enumSchema, Kind.FOO.toString)) + .set("fixedField", new fixedType("%20cフーバー".getBytes())) + .build() AvroConverters.toTableRow(genericRecord) shouldEqual expectedOutput } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadataTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadataTest.java index a56a901b94..bf7280a277 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadataTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadataTest.java @@ -29,6 +29,7 @@ import java.util.List; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -38,7 +39,6 @@ import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser; import org.apache.beam.sdk.extensions.smb.BucketMetadata.HashType; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; @@ -47,42 +47,43 @@ public class AvroBucketMetadataTest { static final Schema LOCATION_SCHEMA = - Schema.createRecord( - "Location", - "", - "org.apache.beam.sdk.extensions.smb.avro", - false, - Lists.newArrayList( - new Schema.Field("countryId", Schema.create(Type.BYTES), "", ""), - new Schema.Field( - "postalCode", - Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES)), - "", - ""), - new Schema.Field( - "prevCountries", - Schema.createArray(Schema.create(Schema.Type.STRING)), - "", - Collections.emptyList()))); - - static final Schema LOCATION_UNION_SCHEMA = - Schema.createUnion(Schema.create(Type.NULL), LOCATION_SCHEMA); - + SchemaBuilder.record("Location") + .namespace("org.apache.beam.sdk.extensions.smb.avro") + .fields() + .requiredBytes("countryId") + .optionalBytes("postalCode") + .name("prevCountries") + .type() + .array() + .items() + .stringType() + .arrayDefault(Collections.emptyList()) + .endRecord(); static final Schema RECORD_SCHEMA = - Schema.createRecord( - "Record", - "", - "org.apache.beam.sdk.extensions.smb.avro", - false, - Lists.newArrayList( - new Schema.Field("id", Schema.create(Schema.Type.LONG), "", 0L), - new Schema.Field("location", LOCATION_SCHEMA, "", Collections.emptyList()), - new Schema.Field("locationUnion", LOCATION_UNION_SCHEMA, "", Collections.emptyList()), - new Schema.Field( - "suffix", - Schema.createEnum("Suffix", "", "", Lists.newArrayList("Jr", "Sr", "None")), - "", - "None"))); + SchemaBuilder.record("Record") + .namespace("org.apache.beam.sdk.extensions.smb.avro") + .fields() + .name("id") + .type() + .longType() + .longDefault(0L) + .name("location") + .type(LOCATION_SCHEMA) + .noDefault() + .name("locationUnion") + .type() + .unionOf() + .nullType() + .and() + .type(LOCATION_SCHEMA) + .endUnion() + .noDefault() + .name("suffix") + .type() + .enumeration("Suffix") + .symbols("Jr", "Sr", "None") + .enumDefault("None") + .endRecord(); @Test public void testGenericRecord() throws Exception { @@ -480,12 +481,12 @@ public void skipsNullSecondaryKeys() private static Schema createUnionRecordOfTypes(Schema.Type... types) { final List typeSchemas = new ArrayList<>(); Arrays.asList(types).forEach(t -> typeSchemas.add(Schema.create(t))); - return Schema.createRecord( - "Record", - "", - "org.apache.beam.sdk.extensions.smb.avro", - false, - Lists.newArrayList( - new Schema.Field("unionField", Schema.createUnion(typeSchemas), "", ""))); + return SchemaBuilder.record("Record") + .namespace("org.apache.beam.sdk.extensions.smb.avro") + .fields() + .name("unionField") + .type(Schema.createUnion(typeSchemas)) + .noDefault() + .endRecord(); } } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroFileOperationsTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroFileOperationsTest.java index 72f9e16ee4..16aec5235f 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroFileOperationsTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/AvroFileOperationsTest.java @@ -27,8 +27,8 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.avro.JsonProperties; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; @@ -44,7 +44,6 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Rule; @@ -56,14 +55,12 @@ public class AvroFileOperationsTest { @Rule public final TemporaryFolder output = new TemporaryFolder(); private static final Schema USER_SCHEMA = - Schema.createRecord( - "User", - "", - "org.apache.beam.sdk.extensions.smb.avro", - false, - Lists.newArrayList( - new Schema.Field("name", Schema.create(Schema.Type.STRING), "", ""), - new Schema.Field("age", Schema.create(Schema.Type.INT), "", 0))); + SchemaBuilder.record("User") + .namespace("org.apache.beam.sdk.extensions.smb.avro") + .fields() + .requiredString("name") + .requiredInt("age") + .endRecord(); private static final Map TEST_METADATA = ImmutableMap.of("foo", "bar"); @@ -149,19 +146,16 @@ public void testDisplayData() { @Test public void testMap2649() throws Exception { final Schema schema = - Schema.createRecord( - "Record", - "", - "org.apache.beam.sdk.extensions.smb.avro", - false, - Lists.newArrayList( - new Schema.Field( - "map", - Schema.createUnion( - Schema.create(Schema.Type.NULL), - Schema.createMap(Schema.create(Schema.Type.STRING))), - "", - JsonProperties.NULL_VALUE))); + SchemaBuilder.record("Record") + .namespace("org.apache.beam.sdk.extensions.smb.avro") + .fields() + .name("map") + .type() + .optional() + .map() + .values() + .stringType() + .endRecord(); final AvroFileOperations fileOperations = AvroFileOperations.of(schema); final ResourceId file = diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/MixedSourcesEndToEndTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/MixedSourcesEndToEndTest.java index c8cc75f081..8ad54f9e7b 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/MixedSourcesEndToEndTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/MixedSourcesEndToEndTest.java @@ -31,8 +31,8 @@ import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.file.CodecFactory; -import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; @@ -114,19 +114,19 @@ private static List empty() { } private static GenericRecord GR(String firstname, String lastname, int age) { - GenericData.Record result = new GenericData.Record(GR_USER_SCHEMA); - result.put( - "firstname", - Optional.ofNullable(firstname) - .map(n -> ByteBuffer.wrap(ByteString.copyFromUtf8(n).toByteArray())) - .orElse(null)); - result.put( - "lastname", - Optional.ofNullable(lastname) - .map(n -> ByteBuffer.wrap(ByteString.copyFromUtf8(n).toByteArray())) - .orElse(null)); - result.put("age", age); - return result; + return new GenericRecordBuilder(GR_USER_SCHEMA) + .set( + "firstname", + Optional.ofNullable(firstname) + .map(n -> ByteBuffer.wrap(ByteString.copyFromUtf8(n).toByteArray())) + .orElse(null)) + .set( + "lastname", + Optional.ofNullable(lastname) + .map(n -> ByteBuffer.wrap(ByteString.copyFromUtf8(n).toByteArray())) + .orElse(null)) + .set("age", age) + .build(); } private static TableRow Json(String firstname, String lastname, String country) { diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java index 86253db795..d4c62c5c88 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperationsTest.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser; @@ -35,7 +36,6 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.avro.AvroDataSupplier; import org.apache.parquet.avro.AvroReadSupport; @@ -56,14 +56,18 @@ public class ParquetAvroFileOperationsTest { @Rule public final TemporaryFolder output = new TemporaryFolder(); private static final Schema USER_SCHEMA = - Schema.createRecord( - "User", - "", - "org.apache.beam.sdk.extensions.smb.avro", - false, - Lists.newArrayList( - new Schema.Field("name", Schema.create(Schema.Type.STRING), "", ""), - new Schema.Field("age", Schema.create(Schema.Type.INT), "", 0))); + SchemaBuilder.record("User") + .namespace("org.apache.beam.sdk.extensions.smb.avro") + .fields() + .name("name") + .type() + .stringType() + .stringDefault("") + .name("age") + .type() + .intType() + .intDefault(0) + .endRecord(); private static final List USER_RECORDS = IntStream.range(0, 10) @@ -165,13 +169,11 @@ public void testGenericProjection() throws Exception { writeFile(file); final Schema projection = - Schema.createRecord( - "UserProjection", - "", - "org.apache.beam.sdk.extensions.smb.avro", - false, - Lists.newArrayList( - new Schema.Field("name", Schema.create(Schema.Type.STRING), "", ""))); + SchemaBuilder.record("UserProjection") + .namespace("org.apache.beam.sdk.extensions.smb.avro") + .fields() + .requiredString("name") + .endRecord(); final Configuration configuration = new Configuration(); AvroReadSupport.setRequestedProjection(configuration, projection); @@ -192,13 +194,12 @@ public void testGenericProjection() throws Exception { @Test public void testSpecificProjection() throws Exception { final Schema projection = - Schema.createRecord( - "AvroGeneratedUserProjection", - "", - "org.apache.beam.sdk.extensions.smb", - false, - Lists.newArrayList( - new Schema.Field("name", Schema.create(Schema.Type.STRING), "", ""))); + SchemaBuilder.record("AvroGeneratedUserProjection") + .namespace("org.apache.beam.sdk.extensions.smb") + .fields() + .requiredString("name") + .endRecord(); + final Configuration configuration = new Configuration(); AvroReadSupport.setRequestedProjection(configuration, projection); diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIOTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIOTest.java index 2eeca2e566..5490700c74 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIOTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/ParquetAvroSortedBucketIOTest.java @@ -17,7 +17,7 @@ package org.apache.beam.sdk.extensions.smb; -import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser; @@ -25,7 +25,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.filter2.predicate.FilterApi; @@ -41,10 +40,7 @@ public class ParquetAvroSortedBucketIOTest { public void testReadSerializable() { final Configuration conf = new Configuration(); AvroReadSupport.setRequestedProjection( - conf, - Schema.createRecord( - Lists.newArrayList( - new Schema.Field("name", Schema.create(Schema.Type.STRING), "", "")))); + conf, SchemaBuilder.record("Record").fields().requiredString("name").endRecord()); SerializableUtils.ensureSerializable( SortedBucketIO.read(String.class) diff --git a/scio-smb/src/test/scala/org/apache/beam/sdk/extensions/smb/ParquetEndToEndTest.scala b/scio-smb/src/test/scala/org/apache/beam/sdk/extensions/smb/ParquetEndToEndTest.scala index 8f430d51c1..78e6618290 100644 --- a/scio-smb/src/test/scala/org/apache/beam/sdk/extensions/smb/ParquetEndToEndTest.scala +++ b/scio-smb/src/test/scala/org/apache/beam/sdk/extensions/smb/ParquetEndToEndTest.scala @@ -24,7 +24,7 @@ import com.spotify.scio.avro._ import com.spotify.scio.coders.Coder import com.spotify.scio.smb._ import com.spotify.scio.testing._ -import org.apache.avro.Schema +import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser import org.apache.beam.sdk.values.TupleTag @@ -33,28 +33,22 @@ import scala.jdk.CollectionConverters._ import scala.collection.compat._ object ParquetEndToEndTest { - val eventSchema: Schema = Schema.createRecord( - "Event", - "", - "org.apache.beam.sdk.extensions.smb.avro", - false, - List( - new Schema.Field("user", Schema.create(Schema.Type.STRING), "", ""), - new Schema.Field("event", Schema.create(Schema.Type.STRING), "", ""), - new Schema.Field("timestamp", Schema.create(Schema.Type.INT), "", 0) - ).asJava - ) - - val userSchema: Schema = Schema.createRecord( - "User", - "", - "org.apache.beam.sdk.extensions.smb.avro", - false, - List( - new Schema.Field("name", Schema.create(Schema.Type.STRING), "", ""), - new Schema.Field("age", Schema.create(Schema.Type.INT), "", 0) - ).asJava - ) + val eventSchema: Schema = SchemaBuilder + .record("Event") + .namespace("org.apache.beam.sdk.extensions.smb.avro") + .fields() + .requiredString("user") + .requiredString("event") + .requiredInt("timestamp") + .endRecord() + + val userSchema: Schema = SchemaBuilder + .record("User") + .namespace("org.apache.beam.sdk.extensions.smb.avro") + .fields() + .requiredString("name") + .requiredInt("age") + .endRecord() def avroEvent(x: Int): GenericRecord = new GenericRecordBuilder(eventSchema) .set("user", s"user${x % 10 + 1}") diff --git a/scio-test/src/test/scala/com/spotify/scio/avro/AvroUtils.scala b/scio-test/src/test/scala/com/spotify/scio/avro/AvroUtils.scala index 07a760efcb..6405028eed 100644 --- a/scio-test/src/test/scala/com/spotify/scio/avro/AvroUtils.scala +++ b/scio-test/src/test/scala/com/spotify/scio/avro/AvroUtils.scala @@ -17,47 +17,40 @@ package com.spotify.scio.avro -import org.apache.avro.Schema -import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} +import org.apache.avro.{Schema, SchemaBuilder} import scala.jdk.CollectionConverters._ object AvroUtils { - private def f(name: String, tpe: Schema.Type) = - new Schema.Field( - name, - Schema.createUnion(List(Schema.create(Schema.Type.NULL), Schema.create(tpe)).asJava), - null: String, - null: AnyRef - ) - - private def fArr(name: String, tpe: Schema.Type) = - new Schema.Field(name, Schema.createArray(Schema.create(tpe)), null: String, null: AnyRef) - val schema: Schema = Schema.createRecord("GenericTestRecord", null, null, false) - schema.setFields( - List( - f("int_field", Schema.Type.INT), - f("long_field", Schema.Type.LONG), - f("float_field", Schema.Type.FLOAT), - f("double_field", Schema.Type.DOUBLE), - f("boolean_field", Schema.Type.BOOLEAN), - f("string_field", Schema.Type.STRING), - fArr("array_field", Schema.Type.STRING) - ).asJava - ) + val schema: Schema = SchemaBuilder + .record("GenericTestRecord") + .fields() + .optionalInt("int_field") + .optionalLong("long_field") + .optionalFloat("float_field") + .optionalDouble("double_field") + .optionalBoolean("boolean_field") + .optionalString("string_field") + .name("array_field") + .`type` + .array() + .items() + .stringType() + .noDefault() + .endRecord() - def newGenericRecord(i: Int): GenericRecord = { - val r = new GenericData.Record(schema) - r.put("int_field", 1 * i) - r.put("long_field", 1L * i) - r.put("float_field", 1f * i) - r.put("double_field", 1.0 * i) - r.put("boolean_field", true) - r.put("string_field", "hello") - r.put("array_field", List[CharSequence]("a", "b", "c").asJava) - r - } + def newGenericRecord(i: Int): GenericRecord = + new GenericRecordBuilder(schema) + .set("int_field", 1 * i) + .set("long_field", 1L * i) + .set("float_field", 1f * i) + .set("double_field", 1.0 * i) + .set("boolean_field", true) + .set("string_field", "hello") + .set("array_field", List[CharSequence]("a", "b", "c").asJava) + .build() def newSpecificRecord(i: Int): TestRecord = new TestRecord( diff --git a/scio-test/src/test/scala/com/spotify/scio/io/TapTest.scala b/scio-test/src/test/scala/com/spotify/scio/io/TapTest.scala index d78967ab0c..35ed2e1e15 100644 --- a/scio-test/src/test/scala/com/spotify/scio/io/TapTest.scala +++ b/scio-test/src/test/scala/com/spotify/scio/io/TapTest.scala @@ -26,7 +26,7 @@ import com.spotify.scio.proto.SimpleV2.{SimplePB => SimplePBV2} import com.spotify.scio.proto.SimpleV3.{SimplePB => SimplePBV3} import com.spotify.scio.testing.PipelineSpec import org.apache.avro.Schema -import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.beam.sdk.io.Compression import org.apache.beam.sdk.util.SerializableUtils import org.apache.commons.compress.compressors.CompressorStreamFactory @@ -115,10 +115,10 @@ class TapTest extends TapSpec { val tap = runWithFileFuture { _.parallelize(Seq("a", "b", "c")) - .map { s => - val record: GenericRecord = new GenericData.Record(AvroBytesUtil.schema) - record.put("bytes", ByteBuffer.wrap(s.getBytes)) - record + .map[GenericRecord] { s => + new GenericRecordBuilder(AvroBytesUtil.schema) + .set("bytes", ByteBuffer.wrap(s.getBytes)) + .build() } .saveAsAvroFile(dir.getAbsolutePath, schema = AvroBytesUtil.schema) }