Skip to content

Commit

Permalink
restore removed protected fields
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Apr 26, 2024
1 parent 6a6fe9c commit 2bfaab1
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ public Builder<T> withCompatibility(boolean enableCompatibility) {
@Override
protected ReadSupport<T> getReadSupport() {
if (isReflect) {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
} else {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility);
configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility);
}
return new AvroReadSupport<T>(model);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ public class CodecFactory implements CompressionCodecFactory {
private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<>();
private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<>();

protected final ParquetConfiguration configuration;
protected final ParquetConfiguration conf;
protected final int pageSize;

// May be null if parquetConfiguration is not an instance of org.apache.parquet.conf.HadoopParquetConfiguration
@Deprecated
protected final Configuration configuration;

static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() {
@Override
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) {
Expand Down Expand Up @@ -115,7 +119,12 @@ public CodecFactory(Configuration configuration, int pageSize) {
* decompressors this parameter has no impact on the function of the factory
*/
public CodecFactory(ParquetConfiguration configuration, int pageSize) {
this.configuration = configuration;
if (configuration instanceof HadoopParquetConfiguration) {
this.configuration = ((HadoopParquetConfiguration) configuration).getConfiguration();
} else {
this.configuration = null;
}
this.conf = configuration;
this.pageSize = pageSize;
}

Expand Down Expand Up @@ -293,7 +302,7 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
codecClass = new Configuration(false).getClassLoader().loadClass(codecClassName);
}
codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, ConfigurationUtil.createHadoopConfiguration(configuration));
ReflectionUtils.newInstance(codecClass, ConfigurationUtil.createHadoopConfiguration(conf));
CODEC_BY_NAME.put(codecCacheKey, codec);
return codec;
} catch (ClassNotFoundException e) {
Expand All @@ -305,13 +314,13 @@ private String cacheKey(CompressionCodecName codecName) {
String level = null;
switch (codecName) {
case GZIP:
level = configuration.get("zlib.compress.level");
level = conf.get("zlib.compress.level");
break;
case BROTLI:
level = configuration.get("compression.brotli.quality");
level = conf.get("compression.brotli.quality");
break;
case ZSTD:
level = configuration.get("parquet.compression.codec.zstd.level");
level = conf.get("parquet.compression.codec.zstd.level");
break;
default:
// compression level is not supported; ignore it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ private class ZstdCompressor extends BaseCompressor {

ZstdCompressor() {
context = new ZstdCompressCtx();
context.setLevel(configuration.getInt(
context.setLevel(conf.getInt(
ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL));
context.setWorkers(configuration.getInt(
context.setWorkers(conf.getInt(
ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,27 +196,31 @@ public static class Builder<T> {
private final Path path;
private Filter filter = null;
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
protected ParquetConfiguration conf;
protected ParquetConfiguration configuration;
private ParquetReadOptions.Builder optionsBuilder;

// May be null if parquetConfiguration is not an instance of org.apache.parquet.conf.HadoopParquetConfiguration
@Deprecated
protected Configuration conf;

@Deprecated
private Builder(ReadSupport<T> readSupport, Path path) {
this.readSupport = Objects.requireNonNull(readSupport, "readSupport cannot be null");
this.file = null;
this.path = Objects.requireNonNull(path, "path cannot be null");
Configuration hadoopConf = new Configuration();
this.conf = new HadoopParquetConfiguration(hadoopConf);
this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path);
this.conf = new Configuration();
this.configuration = new HadoopParquetConfiguration(this.conf);
this.optionsBuilder = HadoopReadOptions.builder(this.conf, path);
}

@Deprecated
protected Builder(Path path) {
this.readSupport = null;
this.file = null;
this.path = Objects.requireNonNull(path, "path cannot be null");
Configuration hadoopConf = new Configuration();
this.conf = new HadoopParquetConfiguration(hadoopConf);
this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path);
this.conf = new Configuration();
this.configuration = new HadoopParquetConfiguration(this.conf);
this.optionsBuilder = HadoopReadOptions.builder(this.conf, path);
}

protected Builder(InputFile file) {
Expand All @@ -225,9 +229,9 @@ protected Builder(InputFile file) {
this.path = null;
if (file instanceof HadoopInputFile) {
HadoopInputFile hadoopFile = (HadoopInputFile) file;
Configuration hadoopConf = hadoopFile.getConfiguration();
this.conf = new HadoopParquetConfiguration(hadoopConf);
optionsBuilder = HadoopReadOptions.builder(hadoopConf, hadoopFile.getPath());
this.conf = hadoopFile.getConfiguration();
this.configuration = new HadoopParquetConfiguration(this.conf);
optionsBuilder = HadoopReadOptions.builder(this.conf, hadoopFile.getPath());
} else {
optionsBuilder = ParquetReadOptions.builder(new HadoopParquetConfiguration());
}
Expand All @@ -237,19 +241,20 @@ protected Builder(InputFile file, ParquetConfiguration conf) {
this.readSupport = null;
this.file = Objects.requireNonNull(file, "file cannot be null");
this.path = null;
this.conf = conf;
this.configuration = conf;
if (file instanceof HadoopInputFile) {
this.conf = ConfigurationUtil.createHadoopConfiguration(conf);
HadoopInputFile hadoopFile = (HadoopInputFile) file;
optionsBuilder = HadoopReadOptions.builder(
ConfigurationUtil.createHadoopConfiguration(conf), hadoopFile.getPath());
optionsBuilder = HadoopReadOptions.builder(this.conf, hadoopFile.getPath());
} else {
optionsBuilder = ParquetReadOptions.builder(conf);
}
}

// when called, resets options to the defaults from conf
public Builder<T> withConf(Configuration conf) {
this.conf = new HadoopParquetConfiguration(Objects.requireNonNull(conf, "conf cannot be null"));
this.conf = Objects.requireNonNull(conf, "conf cannot be null");
this.configuration = new HadoopParquetConfiguration(this.conf);

// previous versions didn't use the builder, so may set filter before conf. this maintains
// compatibility for filter. other options are reset by a new conf.
Expand All @@ -262,7 +267,7 @@ public Builder<T> withConf(Configuration conf) {
}

public Builder<T> withConf(ParquetConfiguration conf) {
this.conf = conf;
this.configuration = conf;
this.optionsBuilder = ParquetReadOptions.builder(conf);
if (filter != null) {
optionsBuilder.withRecordFilter(filter);
Expand Down Expand Up @@ -383,7 +388,7 @@ public ParquetReader<T> build() throws IOException {
ParquetReadOptions options = optionsBuilder.withAllocator(allocator).build();

if (path != null) {
Configuration hadoopConf = ConfigurationUtil.createHadoopConfiguration(conf);
Configuration hadoopConf = ConfigurationUtil.createHadoopConfiguration(configuration);
FileSystem fs = path.getFileSystem(hadoopConf);
FileStatus stat = fs.getFileStatus(path);

Expand Down
5 changes: 1 addition & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -577,13 +577,10 @@
</excludeModules>
<excludes>
<exclude>${shade.prefix}</exclude>
<!-- Due to protected field type change from Configuration to ParquetConfiguration -->
<exclude>org.apache.parquet.hadoop.CodecFactory#configuration</exclude>
<exclude>org.apache.parquet.hadoop.ParquetReader$Builder#conf</exclude>
<!-- Removal of a protected method in a class that's not supposed to be subclassed by third-party code -->
<exclude>org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[])</exclude>
<!-- Due to the removal of deprecated methods -->
<exclude>org.apache.parquet.arrow.schema.SchemaMapping</exclude>
<exclude>org.apache.parquet.arrow.schema.SchemaMapping$TypeMappingVisitor#visit(org.apache.parquet.arrow.schema.SchemaMapping$MapTypeMapping)</exclude>
<!-- Make static variables final -->
<exclude>org.apache.parquet.avro.AvroReadSupport#AVRO_REQUESTED_PROJECTION</exclude>
<exclude>org.apache.parquet.avro.AvroReadSupport#AVRO_DATA_SUPPLIER</exclude>
Expand Down

0 comments on commit 2bfaab1

Please sign in to comment.