Skip to content

Commit

Permalink
Added more test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
shnapz committed Jan 17, 2024
1 parent f057924 commit 0cbfe14
Showing 1 changed file with 120 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,127 +217,149 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll {
}

it should "write and read GenericRecords with default logical types" in withTempDir { dir =>
val records: Seq[GenericRecord] = (1 to 10).map { _ =>
val gr = new GenericRecordBuilder(TestLogicalTypes.SCHEMA$)
gr.set("timestamp", DateTime.now())
gr.set(
"decimal",
BigDecimal.decimal(1.0).setScale(2).bigDecimal
)
gr.build()
}
forAllCases(readConfigs) { case (readConf, testCase) =>
val records: Seq[GenericRecord] = (1 to 10).map { _ =>
val gr = new GenericRecordBuilder(TestLogicalTypes.SCHEMA$)
gr.set("timestamp", DateTime.now())
gr.set(
"decimal",
BigDecimal.decimal(1.0).setScale(2).bigDecimal
)
gr.build()
}

implicit val coder = {
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion)
GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion)
avroGenericRecordCoder(TestLogicalTypes.SCHEMA$)
}
implicit val coder = {
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion)
GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion)
avroGenericRecordCoder(TestLogicalTypes.SCHEMA$)
}

val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(
path = dir.getAbsolutePath,
schema = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(
path = dir.getAbsolutePath + "/" + testCase,
schema = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
)
)
)
sc1.run()
sc1.run()

val sc2 = ScioContext()
sc2
.parquetAvroFile[GenericRecord](
path = dir.getAbsolutePath,
projection = TestLogicalTypes.SCHEMA$,
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
),
suffix = ".parquet"
val sc2 = ScioContext()
val c = readConf()
c.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER,
classOf[LogicalTypeSupplier],
classOf[LogicalTypeSupplier]
)
.map(identity) should containInAnyOrder(records)
sc2
.parquetAvroFile[GenericRecord](
path = dir.getAbsolutePath + "/" + testCase,
projection = TestLogicalTypes.SCHEMA$,
conf = c,
suffix = ".parquet"
)
.map(identity) should containInAnyOrder(records)

sc2.run()
sc2.run()
}
}

it should "write and read SpecificRecords with custom logical types" in withTempDir { dir =>
val records =
(1 to 10).map(_ =>
TestLogicalTypes
.newBuilder()
.setTimestamp(DateTime.now())
.setDecimal(BigDecimal.decimal(1.0).setScale(2).bigDecimal)
.build()
)
forAllCases(readConfigs) { case (readConf, testCase) =>
val records =
(1 to 10).map(_ =>
TestLogicalTypes
.newBuilder()
.setTimestamp(DateTime.now())
.setDecimal(BigDecimal.decimal(1.0).setScale(2).bigDecimal)
.build()
)

val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(
path = dir.getAbsolutePath,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[CustomLogicalTypeSupplier]
val sc1 = ScioContext()
sc1
.parallelize(records)
.saveAsParquetAvroFile(
path = dir.getAbsolutePath + "/" + testCase,
conf = ParquetConfiguration.of(
AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[CustomLogicalTypeSupplier]
)
)
)
sc1.run()
sc1.run()

val sc2 = ScioContext()
sc2
.parquetAvroFile[TestLogicalTypes](
path = dir.getAbsolutePath,
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[CustomLogicalTypeSupplier]
),
suffix = ".parquet"
val sc2 = ScioContext()
val c = readConf()
c.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER,
classOf[CustomLogicalTypeSupplier],
classOf[CustomLogicalTypeSupplier]
)
.map(identity) should containInAnyOrder(records)
sc2
.parquetAvroFile[TestLogicalTypes](
path = dir.getAbsolutePath + "/" + testCase,
conf = c,
suffix = ".parquet"
)
.map(identity) should containInAnyOrder(records)

sc2.run()
()
sc2.run()
()
}
}

it should "read with incomplete projection" in withTempDir { dir =>
val sc1 = ScioContext()
val nestedRecords =
(1 to 10).map(x => new Account(x, x.toString, x.toString, x.toDouble, AccountStatus.Active))
sc1
.parallelize(nestedRecords)
.saveAsParquetAvroFile(dir.getAbsolutePath)
sc1.run()
forAllCases(readConfigs) { case (readConf, testCase) =>
val sc1 = ScioContext()
val nestedRecords =
(1 to 10).map(x => new Account(x, x.toString, x.toString, x.toDouble, AccountStatus.Active))
sc1
.parallelize(nestedRecords)
.saveAsParquetAvroFile(dir.getAbsolutePath + "/" + testCase)
sc1.run()

val sc2 = ScioContext()
val projection = Projection[Account](_.getName)
val data = sc2.parquetAvroFile[Account](
path = dir.getAbsolutePath,
projection = projection,
suffix = ".parquet"
)
val expected = nestedRecords.map(_.getName.toString)
data.map(_.getName.toString) should containInAnyOrder(expected)
data.flatMap(a => Some(a.getName.toString)) should containInAnyOrder(expected)
sc2.run()
val sc2 = ScioContext()
val projection = Projection[Account](_.getName)
val data = sc2.parquetAvroFile[Account](
path = dir.getAbsolutePath + "/" + testCase,
projection = projection,
conf = readConf(),
suffix = ".parquet"
)
val expected = nestedRecords.map(_.getName.toString)
data.map(_.getName.toString) should containInAnyOrder(expected)
data.flatMap(a => Some(a.getName.toString)) should containInAnyOrder(expected)
sc2.run()
}
}

it should "read/write generic records" in withTempDir { dir =>
val genericRecords = (1 to 100).map(AvroUtils.newGenericRecord)
val sc1 = ScioContext()
implicit val coder = avroGenericRecordCoder(AvroUtils.schema)
sc1
.parallelize(genericRecords)
.saveAsParquetAvroFile(dir.getAbsolutePath, numShards = 1, schema = AvroUtils.schema)
sc1.run()
forAllCases(readConfigs) { case (readConf, testCase) =>
val genericRecords = (1 to 100).map(AvroUtils.newGenericRecord)
val sc1 = ScioContext()
implicit val coder = avroGenericRecordCoder(AvroUtils.schema)
sc1
.parallelize(genericRecords)
.saveAsParquetAvroFile(
dir.getAbsolutePath + "/" + testCase,
numShards = 1,
schema = AvroUtils.schema
)
sc1.run()

val files = dir.listFiles()
files.length shouldBe 1
val files = dir.listFiles()
files.length shouldBe 1

val sc2 = ScioContext()
val data: SCollection[GenericRecord] = sc2.parquetAvroFile[GenericRecord](
path = dir.getAbsolutePath,
projection = AvroUtils.schema,
suffix = ".parquet"
)
data should containInAnyOrder(genericRecords)
sc2.run()
val sc2 = ScioContext()
val data: SCollection[GenericRecord] = sc2.parquetAvroFile[GenericRecord](
path = dir.getAbsolutePath + "/" + testCase,
projection = AvroUtils.schema,
conf = readConf(),
suffix = ".parquet"
)
data should containInAnyOrder(genericRecords)
sc2.run()
}
}

class TestRecordProjection private (str: String) {}
Expand Down

0 comments on commit 0cbfe14

Please sign in to comment.