Skip to content

Commit

Permalink
Use GenericRecordBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Dec 14, 2023
1 parent 88cc5e1 commit 6f39bf7
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.spotify.scio.util.MultiJoin
import com.spotify.scio.values.SCollection
import org.apache.avro.Schema
import org.apache.avro.Schema.Field
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.beam.sdk.extensions.smb.{AvroSortedBucketIO, SortedBucketIO, TargetParallelism}
import org.apache.beam.sdk.values.TupleTag
import org.apache.commons.io.FileUtils
Expand Down Expand Up @@ -213,10 +213,10 @@ class SortMergeBucketParityIT extends AnyFlatSpec with Matchers {

val outputPaths = (0 until numSources).map { n =>
val data = (0 to Random.nextInt(100)).map { i =>
val gr: GenericRecord = new GenericData.Record(schema)
gr.put("key", i)
gr.put("value", s"v$i")
gr
new GenericRecordBuilder(schema)
.set("key", i)
.set("value", s"v$i")
.build()
}

val outputPath = new File(tempFolder, s"source$n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import com.spotify.scio.avro._
import com.spotify.scio.avro.types.AvroType
import com.spotify.scio.io.ClosedTap
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.generic.{GenericData, GenericRecord}

import scala.jdk.CollectionConverters._
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}

object AvroExample {
@AvroType.fromSchema("""{
Expand Down Expand Up @@ -107,12 +105,12 @@ object AvroExample {
implicit def genericCoder = avroGenericRecordCoder(schema)
sc.parallelize(1 to 100)
.map[GenericRecord] { i =>
val r = new GenericData.Record(schema)
r.put("id", i)
r.put("amount", i.toDouble)
r.put("name", "account" + i)
r.put("type", "checking")
r
new GenericRecordBuilder(schema)
.set("id", i)
.set("amount", i.toDouble)
.set("name", "account" + i)
.set("type", "checking")
.build()
}
.saveAsAvroFile(args("output"), schema = schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import com.spotify.scio.coders.Coder
import com.spotify.scio.values.SCollection
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.beam.sdk.extensions.smb.BucketMetadata.HashType
import org.apache.beam.sdk.extensions.smb.{AvroSortedBucketIO, TargetParallelism}
import org.apache.beam.sdk.values.TupleTag
Expand All @@ -58,13 +58,10 @@ object SortMergeBucketExample {
|""".stripMargin
)

def user(id: String, age: Int): GenericRecord = {
val gr = new GenericData.Record(UserDataSchema)
gr.put("userId", id)
gr.put("age", age)

gr
}
def user(id: String, age: Int): GenericRecord = new GenericRecordBuilder(UserDataSchema)
.set("userId", id)
.set("age", age)
.build()
}

object SortMergeBucketWriteExample {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ package com.spotify.scio.examples.extra
import com.spotify.scio.avro.AvroIO
import com.spotify.scio.io._
import com.spotify.scio.testing._
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}

class MagnolifyAvroExampleTest extends PipelineSpec {
import MagnolifyAvroExample._

val textIn: Seq[String] = Seq("a b c d e", "a b a b")
val wordCount: Seq[(String, Long)] = Seq(("a", 3L), ("b", 3L), ("c", 1L), ("d", 1L), ("e", 1L))
val records: Seq[GenericRecord] = wordCount.map { kv =>
val r = new GenericData.Record(wordCountType.schema)
r.put("word", kv._1)
r.put("count", kv._2)
r
new GenericRecordBuilder(wordCountType.schema)
.set("word", kv._1)
.set("count", kv._2)
.build()
}
val textOut: Seq[String] = wordCount.map(kv => kv._1 + ": " + kv._2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package com.spotify.scio.extra.bigquery

import java.math.{BigDecimal => JBigDecimal}
import java.nio.ByteBuffer

import com.google.protobuf.ByteString
import com.spotify.scio.bigquery.TableRow
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.avro.generic.GenericData.EnumSymbol
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding
import org.joda.time.{DateTime, LocalDate, LocalTime}
Expand Down Expand Up @@ -72,29 +71,24 @@ class ToTableRowTest extends AnyFlatSpec with Matchers {
it should "convert a GenericRecord to TableRow" in {
val enumSchema = AvroExample.SCHEMA$.getField("enumField").schema()

val nestedAvro = new GenericData.Record(NestedAvro.SCHEMA$)
nestedAvro.put("nestedField", "nestedValue")
val nestedAvro = new GenericRecordBuilder(NestedAvro.SCHEMA$)
.set("nestedField", "nestedValue")
.build()

val genericRecord = new GenericData.Record(AvroExample.SCHEMA$)
genericRecord.put("booleanField", true)
genericRecord.put("stringField", "someString")
genericRecord.put("doubleField", 1.0)
genericRecord.put("longField", 1L)
genericRecord.put("intField", 1)
genericRecord.put("floatField", 1f)
genericRecord.put(
"bytesField",
ByteBuffer.wrap(ByteString.copyFromUtf8("%20cフーバー").toByteArray)
)
genericRecord.put("arrayField", List(nestedAvro).asJava)
genericRecord.put("unionField", "someUnion")
genericRecord.put(
"mapField",
Map("mapKey" -> 1.0d).asJava
.asInstanceOf[java.util.Map[java.lang.CharSequence, java.lang.Double]]
)
genericRecord.put("enumField", new EnumSymbol(enumSchema, Kind.FOO.toString))
genericRecord.put("fixedField", new fixedType("%20cフーバー".getBytes()))
val genericRecord = new GenericRecordBuilder(AvroExample.SCHEMA$)
.set("booleanField", true)
.set("stringField", "someString")
.set("doubleField", 1.0)
.set("longField", 1L)
.set("intField", 1)
.set("floatField", 1f)
.set("bytesField", ByteBuffer.wrap(ByteString.copyFromUtf8("%20cフーバー").toByteArray))
.set("arrayField", List(nestedAvro).asJava)
.set("unionField", "someUnion")
.set("mapField", Map[CharSequence, java.lang.Double]("mapKey" -> 1.0d).asJava)
.set("enumField", new EnumSymbol(enumSchema, Kind.FOO.toString))
.set("fixedField", new fixedType("%20cフーバー".getBytes()))
.build()

AvroConverters.toTableRow(genericRecord) shouldEqual expectedOutput
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
Expand Down Expand Up @@ -114,19 +114,19 @@ private static <A> List<A> empty() {
}

private static GenericRecord GR(String firstname, String lastname, int age) {
GenericData.Record result = new GenericData.Record(GR_USER_SCHEMA);
result.put(
"firstname",
Optional.ofNullable(firstname)
.map(n -> ByteBuffer.wrap(ByteString.copyFromUtf8(n).toByteArray()))
.orElse(null));
result.put(
"lastname",
Optional.ofNullable(lastname)
.map(n -> ByteBuffer.wrap(ByteString.copyFromUtf8(n).toByteArray()))
.orElse(null));
result.put("age", age);
return result;
return new GenericRecordBuilder(GR_USER_SCHEMA)
.set(
"firstname",
Optional.ofNullable(firstname)
.map(n -> ByteBuffer.wrap(ByteString.copyFromUtf8(n).toByteArray()))
.orElse(null))
.set(
"lastname",
Optional.ofNullable(lastname)
.map(n -> ByteBuffer.wrap(ByteString.copyFromUtf8(n).toByteArray()))
.orElse(null))
.set("age", age)
.build();
}

private static TableRow Json(String firstname, String lastname, String country) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ public class ParquetAvroSortedBucketIOTest {
public void testReadSerializable() {
final Configuration conf = new Configuration();
AvroReadSupport.setRequestedProjection(
conf,
SchemaBuilder.record("Record")
.fields()
.requiredString("name")
.endRecord());
conf, SchemaBuilder.record("Record").fields().requiredString("name").endRecord());

SerializableUtils.ensureSerializable(
SortedBucketIO.read(String.class)
Expand Down
10 changes: 5 additions & 5 deletions scio-test/src/test/scala/com/spotify/scio/io/TapTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.spotify.scio.proto.SimpleV2.{SimplePB => SimplePBV2}
import com.spotify.scio.proto.SimpleV3.{SimplePB => SimplePBV3}
import com.spotify.scio.testing.PipelineSpec
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.beam.sdk.io.Compression
import org.apache.beam.sdk.util.SerializableUtils
import org.apache.commons.compress.compressors.CompressorStreamFactory
Expand Down Expand Up @@ -115,10 +115,10 @@ class TapTest extends TapSpec {

val tap = runWithFileFuture {
_.parallelize(Seq("a", "b", "c"))
.map { s =>
val record: GenericRecord = new GenericData.Record(AvroBytesUtil.schema)
record.put("bytes", ByteBuffer.wrap(s.getBytes))
record
.map[GenericRecord] { s =>
new GenericRecordBuilder(AvroBytesUtil.schema)
.set("bytes", ByteBuffer.wrap(s.getBytes))
.build()
}
.saveAsAvroFile(dir.getAbsolutePath, schema = AvroBytesUtil.schema)
}
Expand Down

0 comments on commit 6f39bf7

Please sign in to comment.