Skip to content

Commit

Permalink
fix 3
Browse files Browse the repository at this point in the history
  • Loading branch information
shnapz committed Jan 17, 2024
1 parent e932cac commit f057924
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(
public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(
Schema schema, CompressionCodecName compression, FilterPredicate predicate,
Configuration conf) {

return of(schema, null, compression, predicate, conf);
}

Expand All @@ -110,14 +109,6 @@ public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(
CompressionCodecName compression,
FilterPredicate predicate,
Configuration conf) {
if (recordClass == null && conf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) {
conf.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER,
GenericDataSupplier.class,
AvroDataSupplier.class
);
}

return new ParquetAvroFileOperations<>(schema, recordClass, compression, conf, predicate);
}

Expand Down Expand Up @@ -158,7 +149,7 @@ public void populateDisplayData(DisplayData.Builder builder) {

@Override
protected Reader<ValueT> createReader() {
return new ParquetAvroReader<>(schemaSupplier, conf, predicate);
return new ParquetAvroReader<>(schemaSupplier, conf, predicate, recordClass);
}

@Override
Expand Down Expand Up @@ -186,16 +177,19 @@ private static class ParquetAvroReader<ValueT> extends FileOperations.Reader<Val
private final SerializableSchemaSupplier schemaSupplier;
private final SerializableConfiguration conf;
private final FilterPredicate predicate;
private final Class<ValueT> recordClass;
private transient ParquetReader<ValueT> reader;
private transient ValueT current;

private ParquetAvroReader(
SerializableSchemaSupplier schemaSupplier,
SerializableConfiguration conf,
FilterPredicate predicate) {
FilterPredicate predicate,
Class<ValueT> recordClass) {
this.schemaSupplier = schemaSupplier;
this.conf = conf;
this.predicate = predicate;
this.recordClass = recordClass;
}

@Override
Expand All @@ -209,6 +203,14 @@ public void prepareRead(ReadableByteChannel channel) throws IOException {
AvroReadSupport.setRequestedProjection(configuration, schema);
}

if (recordClass == null && configuration.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) {
configuration.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER,
GenericDataSupplier.class,
AvroDataSupplier.class
);
}

ParquetReader.Builder<ValueT> builder =
AvroParquetReader.<ValueT>builder(new ParquetInputFile(channel)).withConf(configuration);
if (predicate != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,6 @@ public void testGenericRecord() throws Exception {
Assert.assertEquals(USER_RECORDS, actual);
}

@Test
public void testDataSupplierIsSetForGenericRecord() throws Exception {
final ResourceId file =
fromFolder(output)
.resolve("file.parquet", ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
writeFile(file);

final ParquetAvroFileOperations<GenericRecord> fileOperations =
ParquetAvroFileOperations.of(USER_SCHEMA);

final List<GenericRecord> actual = new ArrayList<>();
fileOperations.iterator(file).forEachRemaining(actual::add);

Assert.assertEquals(USER_RECORDS, actual);
}

@Test
public void testSpecificRecord() throws Exception {
final ParquetAvroFileOperations<AvroGeneratedUser> fileOperations =
Expand Down

0 comments on commit f057924

Please sign in to comment.