diff --git a/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala b/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala index 5ce7bf65d0..3dee5dbf05 100644 --- a/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala +++ b/scio-parquet/src/test/scala/com/spotify/scio/parquet/avro/ParquetAvroIOTest.scala @@ -217,127 +217,149 @@ class ParquetAvroIOTest extends ScioIOSpec with TapSpec with BeforeAndAfterAll { } it should "write and read GenericRecords with default logical types" in withTempDir { dir => - val records: Seq[GenericRecord] = (1 to 10).map { _ => - val gr = new GenericRecordBuilder(TestLogicalTypes.SCHEMA$) - gr.set("timestamp", DateTime.now()) - gr.set( - "decimal", - BigDecimal.decimal(1.0).setScale(2).bigDecimal - ) - gr.build() - } + forAllCases(readConfigs) { case (readConf, testCase) => + val records: Seq[GenericRecord] = (1 to 10).map { _ => + val gr = new GenericRecordBuilder(TestLogicalTypes.SCHEMA$) + gr.set("timestamp", DateTime.now()) + gr.set( + "decimal", + BigDecimal.decimal(1.0).setScale(2).bigDecimal + ) + gr.build() + } - implicit val coder = { - GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion) - GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion) - avroGenericRecordCoder(TestLogicalTypes.SCHEMA$) - } + implicit val coder = { + GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion) + GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion) + avroGenericRecordCoder(TestLogicalTypes.SCHEMA$) + } - val sc1 = ScioContext() - sc1 - .parallelize(records) - .saveAsParquetAvroFile( - path = dir.getAbsolutePath, - schema = TestLogicalTypes.SCHEMA$, - conf = ParquetConfiguration.of( - AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier] + val sc1 = ScioContext() + sc1 + .parallelize(records) + .saveAsParquetAvroFile( + path = dir.getAbsolutePath + "/" + testCase, + schema = TestLogicalTypes.SCHEMA$, + conf = ParquetConfiguration.of( + AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier] + ) ) - ) - sc1.run() + sc1.run() - val sc2 = ScioContext() - sc2 - .parquetAvroFile[GenericRecord]( - path = dir.getAbsolutePath, - projection = TestLogicalTypes.SCHEMA$, - conf = ParquetConfiguration.of( - AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier] - ), - suffix = ".parquet" + val sc2 = ScioContext() + val c = readConf() + c.setClass( + AvroReadSupport.AVRO_DATA_SUPPLIER, + classOf[LogicalTypeSupplier], + classOf[LogicalTypeSupplier] ) - .map(identity) should containInAnyOrder(records) + sc2 + .parquetAvroFile[GenericRecord]( + path = dir.getAbsolutePath + "/" + testCase, + projection = TestLogicalTypes.SCHEMA$, + conf = c, + suffix = ".parquet" + ) + .map(identity) should containInAnyOrder(records) - sc2.run() + sc2.run() + } } it should "write and read SpecificRecords with custom logical types" in withTempDir { dir => - val records = - (1 to 10).map(_ => - TestLogicalTypes - .newBuilder() - .setTimestamp(DateTime.now()) - .setDecimal(BigDecimal.decimal(1.0).setScale(2).bigDecimal) - .build() - ) + forAllCases(readConfigs) { case (readConf, testCase) => + val records = + (1 to 10).map(_ => + TestLogicalTypes + .newBuilder() + .setTimestamp(DateTime.now()) + .setDecimal(BigDecimal.decimal(1.0).setScale(2).bigDecimal) + .build() + ) - val sc1 = ScioContext() - sc1 - .parallelize(records) - .saveAsParquetAvroFile( - path = dir.getAbsolutePath, - conf = ParquetConfiguration.of( - AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[CustomLogicalTypeSupplier] + val sc1 = ScioContext() + sc1 + .parallelize(records) + .saveAsParquetAvroFile( + path = dir.getAbsolutePath + "/" + testCase, + conf = ParquetConfiguration.of( + AvroWriteSupport.AVRO_DATA_SUPPLIER -> classOf[CustomLogicalTypeSupplier] + ) ) - ) - sc1.run() + sc1.run() - val sc2 = ScioContext() - sc2 - .parquetAvroFile[TestLogicalTypes]( - path = dir.getAbsolutePath, - conf = ParquetConfiguration.of( - AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[CustomLogicalTypeSupplier] - ), - suffix = ".parquet" + val sc2 = ScioContext() + val c = readConf() + c.setClass( + AvroReadSupport.AVRO_DATA_SUPPLIER, + classOf[CustomLogicalTypeSupplier], + classOf[CustomLogicalTypeSupplier] ) - .map(identity) should containInAnyOrder(records) + sc2 + .parquetAvroFile[TestLogicalTypes]( + path = dir.getAbsolutePath + "/" + testCase, + conf = c, + suffix = ".parquet" + ) + .map(identity) should containInAnyOrder(records) - sc2.run() - () + sc2.run() + () + } } it should "read with incomplete projection" in withTempDir { dir => - val sc1 = ScioContext() - val nestedRecords = - (1 to 10).map(x => new Account(x, x.toString, x.toString, x.toDouble, AccountStatus.Active)) - sc1 - .parallelize(nestedRecords) - .saveAsParquetAvroFile(dir.getAbsolutePath) - sc1.run() + forAllCases(readConfigs) { case (readConf, testCase) => + val sc1 = ScioContext() + val nestedRecords = + (1 to 10).map(x => new Account(x, x.toString, x.toString, x.toDouble, AccountStatus.Active)) + sc1 + .parallelize(nestedRecords) + .saveAsParquetAvroFile(dir.getAbsolutePath + "/" + testCase) + sc1.run() - val sc2 = ScioContext() - val projection = Projection[Account](_.getName) - val data = sc2.parquetAvroFile[Account]( - path = dir.getAbsolutePath, - projection = projection, - suffix = ".parquet" - ) - val expected = nestedRecords.map(_.getName.toString) - data.map(_.getName.toString) should containInAnyOrder(expected) - data.flatMap(a => Some(a.getName.toString)) should containInAnyOrder(expected) - sc2.run() + val sc2 = ScioContext() + val projection = Projection[Account](_.getName) + val data = sc2.parquetAvroFile[Account]( + path = dir.getAbsolutePath + "/" + testCase, + projection = projection, + conf = readConf(), + suffix = ".parquet" + ) + val expected = nestedRecords.map(_.getName.toString) + data.map(_.getName.toString) should containInAnyOrder(expected) + data.flatMap(a => Some(a.getName.toString)) should containInAnyOrder(expected) + sc2.run() + } } it should "read/write generic records" in withTempDir { dir => - val genericRecords = (1 to 100).map(AvroUtils.newGenericRecord) - val sc1 = ScioContext() - implicit val coder = avroGenericRecordCoder(AvroUtils.schema) - sc1 - .parallelize(genericRecords) - .saveAsParquetAvroFile(dir.getAbsolutePath, numShards = 1, schema = AvroUtils.schema) - sc1.run() + forAllCases(readConfigs) { case (readConf, testCase) => + val genericRecords = (1 to 100).map(AvroUtils.newGenericRecord) + val sc1 = ScioContext() + implicit val coder = avroGenericRecordCoder(AvroUtils.schema) + sc1 + .parallelize(genericRecords) + .saveAsParquetAvroFile( + dir.getAbsolutePath + "/" + testCase, + numShards = 1, + schema = AvroUtils.schema + ) + sc1.run() - val files = dir.listFiles() - files.length shouldBe 1 + val files = dir.listFiles() + files.length shouldBe 1 - val sc2 = ScioContext() - val data: SCollection[GenericRecord] = sc2.parquetAvroFile[GenericRecord]( - path = dir.getAbsolutePath, - projection = AvroUtils.schema, - suffix = ".parquet" - ) - data should containInAnyOrder(genericRecords) - sc2.run() + val sc2 = ScioContext() + val data: SCollection[GenericRecord] = sc2.parquetAvroFile[GenericRecord]( + path = dir.getAbsolutePath + "/" + testCase, + projection = AvroUtils.schema, + conf = readConf(), + suffix = ".parquet" + ) + data should containInAnyOrder(genericRecords) + sc2.run() + } } class TestRecordProjection private (str: String) {}