Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use avro builder API #5119

Merged
merged 3 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.spotify.scio.bigquery.BigQueryTypedTable.Format
import com.spotify.scio.bigquery.client.BigQuery
import com.spotify.scio.testing._
import magnolify.scalacheck.auto._
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime}
Expand Down Expand Up @@ -166,28 +166,17 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {

it should "write GenericRecord records with logical types" in {
val sc = ScioContext(options)
import scala.jdk.CollectionConverters._
val schema: Schema = Schema.createRecord(
"Record",
"",
"com.spotify.scio.bigquery",
false,
List(
new Schema.Field(
"date",
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)),
"",
0
),
new Schema.Field(
"time",
LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)),
"",
0L
),
new Schema.Field("datetime", Schema.create(Schema.Type.STRING), "", "")
).asJava
)
// format: off
val schema: Schema = SchemaBuilder
.record("Record")
.namespace("com.spotify.scio.bigquery")
.fields()
.name("date").`type`(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))).withDefault(0)
.name("time").`type`(LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG))).withDefault(0L)
.name("datetime").`type`().stringType().stringDefault("")
.endRecord()
// format: on

implicit val coder = avroGenericRecordCoder(schema)
val ltRecords: Seq[GenericRecord] =
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.spotify.scio.avro.dynamic.syntax

import com.google.protobuf.Message
import com.spotify.scio.avro.AvroIO
import com.spotify.scio.coders.{AvroBytesUtil, Coder, CoderMaterializer}
import com.spotify.scio.coders.{AvroBytesUtil, Coder}
import com.spotify.scio.io.{ClosedTap, EmptyTap}
import com.spotify.scio.io.dynamic.syntax.DynamicSCollectionOps
import com.spotify.scio.protobuf.util.ProtobufUtil
Expand Down Expand Up @@ -142,22 +142,19 @@ final class DynamicProtobufSCollectionOps[T <: Message](private val self: SColle
tempDirectory: String = AvroIO.WriteParam.DefaultTempDirectory,
prefix: String = AvroIO.WriteParam.DefaultPrefix
)(destinationFn: T => String)(implicit ct: ClassTag[T]): ClosedTap[Nothing] = {
val protoCoder = Coder.protoMessageCoder[T]
val elemCoder = CoderMaterializer.beam(self.context, protoCoder)
val avroSchema = AvroBytesUtil.schema
val nm = new JHashMap[String, AnyRef]()
nm.putAll((metadata ++ ProtobufUtil.schemaMetadataOf(ct)).asJava)

if (self.context.isTest) {
throw new NotImplementedError(
"Protobuf file with dynamic destinations cannot be used in a test context"
)
} else {
implicit val protoCoder: Coder[T] = Coder.protoMessageCoder[T]
val nm = new JHashMap[String, AnyRef]()
nm.putAll((metadata ++ ProtobufUtil.schemaMetadataOf(ct)).asJava)

val sink = BAvroIO
.sinkViaGenericRecords(
avroSchema,
(element: T, _: Schema) => AvroBytesUtil.encode(elemCoder, element)
)
.sink(AvroBytesUtil.schema)
.asInstanceOf[BAvroIO.Sink[T]]
.withDatumWriterFactory(AvroBytesUtil.datumWriterFactory)
Comment on lines +155 to +157
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small rewrite here to move away from deprecated sinkViaGenericRecords API

.withCodec(codec)
.withMetadata(nm)
val write = writeDynamic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,47 @@

package com.spotify.scio.coders

import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.{Schema => ASchema}
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.{DatumWriter, Encoder}
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.beam.sdk.coders.{Coder => BCoder}
import org.apache.beam.sdk.extensions.avro.io.AvroSink.DatumWriterFactory
import org.apache.beam.sdk.util.CoderUtils

import java.nio.ByteBuffer
import scala.jdk.CollectionConverters._

private[scio] object AvroBytesUtil {
val schema: ASchema = {
val s = ASchema.createRecord("AvroBytesRecord", null, null, false)
s.setFields(
List(
new ASchema.Field(
"bytes",
ASchema.create(ASchema.Type.BYTES),
null,
null.asInstanceOf[Object]
)
).asJava
)
s
val schema: Schema = SchemaBuilder
.record("AvroBytesRecord")
.fields()
.requiredBytes("bytes")
.endRecord()

private val byteField = schema.getField("bytes")

def datumWriterFactory[T: Coder]: DatumWriterFactory[T] = {
val bCoder = CoderMaterializer.beamWithDefault(Coder[T])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic change here since we do not use the coder options for byte serialization. I think this cased of an issue when consuming data from PubSub, and tunring one Nullable, records could not be read.

(schema: Schema) =>
new DatumWriter[T] {
private val underlying = new GenericDatumWriter[GenericRecord](schema)

override def setSchema(schema: Schema): Unit = underlying.setSchema(schema)

override def write(datum: T, out: Encoder): Unit =
underlying.write(AvroBytesUtil.encode(bCoder, datum), out)
}
}

def encode[T](coder: BCoder[T], obj: T): GenericRecord = {
val bytes = CoderUtils.encodeToByteArray(coder, obj)
val record = new GenericData.Record(schema)
record.put("bytes", ByteBuffer.wrap(bytes))
record
new GenericRecordBuilder(schema)
.set(byteField, ByteBuffer.wrap(bytes))
.build()
}

def decode[T](coder: BCoder[T], record: GenericRecord): T = {
val bb = record.get("bytes").asInstanceOf[ByteBuffer]
val bytes =
java.util.Arrays.copyOfRange(bb.array(), bb.position(), bb.limit())
val bb = record.get(byteField.pos()).asInstanceOf[ByteBuffer]
val bytes = java.util.Arrays.copyOfRange(bb.array(), bb.position(), bb.limit())
CoderUtils.decodeFromByteArray(coder, bytes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package com.spotify.scio.avro.types

import com.spotify.scio.avro.types.Schemas._
import com.spotify.scio.avro.types.Schemas.FieldMode._
import org.apache.avro.Schema
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.beam.model.pipeline.v1.SchemaApi.SchemaOrBuilder
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -149,16 +150,11 @@ class SchemaUtilTest extends AnyFlatSpec with Matchers {
val expectedFields = SchemaUtil.scalaReservedWords
.map(e => s"`$e`")
.mkString(start = "", sep = ": Long, ", end = ": Long")
val schema =
Schema.createRecord(
"Row",
null,
null,
false,
SchemaUtil.scalaReservedWords.map { name =>
new Schema.Field(name, Schema.create(Schema.Type.LONG), null, null.asInstanceOf[Any])
}.asJava
)

val schema = SchemaUtil.scalaReservedWords
.foldLeft(SchemaBuilder.record("Row").fields())(_.requiredLong(_))
.endRecord()

SchemaUtil.toPrettyString1(schema) shouldBe s"case class Row($expectedFields)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.spotify.scio._
import com.spotify.scio.avro._
import com.spotify.scio.avro.types.AvroType
import com.spotify.scio.io.ClosedTap
import org.apache.avro.Schema
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.generic.{GenericData, GenericRecord}

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -133,24 +133,12 @@ object AvroExample {
.map(_.toString)
.saveAsTextFile(args("output"))

val schema: Schema = {
def f(name: String, tpe: Schema.Type) =
new Schema.Field(
name,
Schema.createUnion(List(Schema.create(Schema.Type.NULL), Schema.create(tpe)).asJava),
null: String,
null: AnyRef
)

val s = Schema.createRecord("GenericAccountRecord", null, null, false)
s.setFields(
List(
f("id", Schema.Type.INT),
f("amount", Schema.Type.DOUBLE),
f("name", Schema.Type.STRING),
f("type", Schema.Type.STRING)
).asJava
)
s
}
val schema: Schema = SchemaBuilder
.record("GenericAccountRecord")
.fields()
.requiredInt("id")
.requiredDouble("amount")
.requiredString("name")
.requiredString("type")
.endRecord()
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
Expand All @@ -38,7 +39,6 @@
import org.apache.beam.sdk.extensions.avro.io.AvroGeneratedUser;
import org.apache.beam.sdk.extensions.smb.BucketMetadata.HashType;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -47,42 +47,43 @@
public class AvroBucketMetadataTest {

static final Schema LOCATION_SCHEMA =
Schema.createRecord(
"Location",
"",
"org.apache.beam.sdk.extensions.smb.avro",
false,
Lists.newArrayList(
new Schema.Field("countryId", Schema.create(Type.BYTES), "", ""),
new Schema.Field(
"postalCode",
Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES)),
"",
""),
new Schema.Field(
"prevCountries",
Schema.createArray(Schema.create(Schema.Type.STRING)),
"",
Collections.<String>emptyList())));

static final Schema LOCATION_UNION_SCHEMA =
Schema.createUnion(Schema.create(Type.NULL), LOCATION_SCHEMA);

SchemaBuilder.record("Location")
.namespace("org.apache.beam.sdk.extensions.smb.avro")
.fields()
.requiredBytes("countryId")
.optionalBytes("postalCode")
.name("prevCountries")
.type()
.array()
.items()
.stringType()
.arrayDefault(Collections.emptyList())
.endRecord();
static final Schema RECORD_SCHEMA =
Schema.createRecord(
"Record",
"",
"org.apache.beam.sdk.extensions.smb.avro",
false,
Lists.newArrayList(
new Schema.Field("id", Schema.create(Schema.Type.LONG), "", 0L),
new Schema.Field("location", LOCATION_SCHEMA, "", Collections.emptyList()),
new Schema.Field("locationUnion", LOCATION_UNION_SCHEMA, "", Collections.emptyList()),
new Schema.Field(
"suffix",
Schema.createEnum("Suffix", "", "", Lists.newArrayList("Jr", "Sr", "None")),
"",
"None")));
SchemaBuilder.record("Record")
.namespace("org.apache.beam.sdk.extensions.smb.avro")
.fields()
.name("id")
.type()
.longType()
.longDefault(0L)
.name("location")
.type(LOCATION_SCHEMA)
.noDefault()
.name("locationUnion")
.type()
.unionOf()
.nullType()
.and()
.type(LOCATION_SCHEMA)
.endUnion()
.noDefault()
.name("suffix")
.type()
.enumeration("Suffix")
.symbols("Jr", "Sr", "None")
.enumDefault("None")
.endRecord();

@Test
public void testGenericRecord() throws Exception {
Expand Down Expand Up @@ -480,12 +481,12 @@ public void skipsNullSecondaryKeys()
private static Schema createUnionRecordOfTypes(Schema.Type... types) {
final List<Schema> typeSchemas = new ArrayList<>();
Arrays.asList(types).forEach(t -> typeSchemas.add(Schema.create(t)));
return Schema.createRecord(
"Record",
"",
"org.apache.beam.sdk.extensions.smb.avro",
false,
Lists.newArrayList(
new Schema.Field("unionField", Schema.createUnion(typeSchemas), "", "")));
return SchemaBuilder.record("Record")
.namespace("org.apache.beam.sdk.extensions.smb.avro")
.fields()
.name("unionField")
.type(Schema.createUnion(typeSchemas))
.noDefault()
.endRecord();
}
}
Loading
Loading