Skip to content

Commit

Permalink
PARQUET-2463: Bump japicmp to 0.21.0 (#1329)
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac authored Apr 26, 2024
1 parent 09445b5 commit 23c788d
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ public Builder<T> withCompatibility(boolean enableCompatibility) {
@Override
protected ReadSupport<T> getReadSupport() {
if (isReflect) {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
} else {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility);
configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility);
}
return new AvroReadSupport<T>(model);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ private Binary() {}

public abstract ByteBuffer toByteBuffer();

public abstract short get2BytesLittleEndian();
public short get2BytesLittleEndian() {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public boolean equals(Object obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ public class CodecFactory implements CompressionCodecFactory {
private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<>();
private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<>();

protected final ParquetConfiguration configuration;
protected final ParquetConfiguration conf;
protected final int pageSize;

// May be null if parquetConfiguration is not an instance of org.apache.parquet.conf.HadoopParquetConfiguration
@Deprecated
protected final Configuration configuration;

static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() {
@Override
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) {
Expand Down Expand Up @@ -115,7 +119,12 @@ public CodecFactory(Configuration configuration, int pageSize) {
* decompressors this parameter has no impact on the function of the factory
*/
public CodecFactory(ParquetConfiguration configuration, int pageSize) {
this.configuration = configuration;
if (configuration instanceof HadoopParquetConfiguration) {
this.configuration = ((HadoopParquetConfiguration) configuration).getConfiguration();
} else {
this.configuration = null;
}
this.conf = configuration;
this.pageSize = pageSize;
}

Expand Down Expand Up @@ -293,7 +302,7 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
codecClass = new Configuration(false).getClassLoader().loadClass(codecClassName);
}
codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, ConfigurationUtil.createHadoopConfiguration(configuration));
ReflectionUtils.newInstance(codecClass, ConfigurationUtil.createHadoopConfiguration(conf));
CODEC_BY_NAME.put(codecCacheKey, codec);
return codec;
} catch (ClassNotFoundException e) {
Expand All @@ -305,13 +314,13 @@ private String cacheKey(CompressionCodecName codecName) {
String level = null;
switch (codecName) {
case GZIP:
level = configuration.get("zlib.compress.level");
level = conf.get("zlib.compress.level");
break;
case BROTLI:
level = configuration.get("compression.brotli.quality");
level = conf.get("compression.brotli.quality");
break;
case ZSTD:
level = configuration.get("parquet.compression.codec.zstd.level");
level = conf.get("parquet.compression.codec.zstd.level");
break;
default:
// compression level is not supported; ignore it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
Expand Down Expand Up @@ -513,6 +514,20 @@ public void writeBloomFilter(BloomFilter bloomFilter) {
new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
private final MessageType schema;

@Deprecated
public ColumnChunkPageWriteStore(
BytesCompressor compressor,
MessageType schema,
ByteBufferAllocator allocator,
int columnIndexTruncateLength) {
this(
(BytesInputCompressor) compressor,
schema,
allocator,
columnIndexTruncateLength,
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}

public ColumnChunkPageWriteStore(
BytesInputCompressor compressor,
MessageType schema,
Expand All @@ -526,6 +541,16 @@ public ColumnChunkPageWriteStore(
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}

@Deprecated
public ColumnChunkPageWriteStore(
BytesCompressor compressor,
MessageType schema,
ByteBufferAllocator allocator,
int columnIndexTruncateLength,
boolean pageWriteChecksumEnabled) {
this((BytesInputCompressor) compressor, schema, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled);
}

public ColumnChunkPageWriteStore(
BytesInputCompressor compressor,
MessageType schema,
Expand All @@ -550,6 +575,25 @@ public ColumnChunkPageWriteStore(
}
}

@Deprecated
public ColumnChunkPageWriteStore(
BytesCompressor compressor,
MessageType schema,
ByteBufferAllocator allocator,
int columnIndexTruncateLength,
boolean pageWriteChecksumEnabled,
InternalFileEncryptor fileEncryptor,
int rowGroupOrdinal) {
this(
(BytesInputCompressor) compressor,
schema,
allocator,
columnIndexTruncateLength,
pageWriteChecksumEnabled,
fileEncryptor,
rowGroupOrdinal);
}

public ColumnChunkPageWriteStore(
BytesInputCompressor compressor,
MessageType schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ private class ZstdCompressor extends BaseCompressor {

ZstdCompressor() {
context = new ZstdCompressCtx();
context.setLevel(configuration.getInt(
context.setLevel(conf.getInt(
ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL));
context.setWorkers(configuration.getInt(
context.setWorkers(conf.getInt(
ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,27 +196,31 @@ public static class Builder<T> {
private final Path path;
private Filter filter = null;
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
protected ParquetConfiguration conf;
protected ParquetConfiguration configuration;
private ParquetReadOptions.Builder optionsBuilder;

// May be null if parquetConfiguration is not an instance of org.apache.parquet.conf.HadoopParquetConfiguration
@Deprecated
protected Configuration conf;

@Deprecated
private Builder(ReadSupport<T> readSupport, Path path) {
this.readSupport = Objects.requireNonNull(readSupport, "readSupport cannot be null");
this.file = null;
this.path = Objects.requireNonNull(path, "path cannot be null");
Configuration hadoopConf = new Configuration();
this.conf = new HadoopParquetConfiguration(hadoopConf);
this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path);
this.conf = new Configuration();
this.configuration = new HadoopParquetConfiguration(this.conf);
this.optionsBuilder = HadoopReadOptions.builder(this.conf, path);
}

@Deprecated
protected Builder(Path path) {
this.readSupport = null;
this.file = null;
this.path = Objects.requireNonNull(path, "path cannot be null");
Configuration hadoopConf = new Configuration();
this.conf = new HadoopParquetConfiguration(hadoopConf);
this.optionsBuilder = HadoopReadOptions.builder(hadoopConf, path);
this.conf = new Configuration();
this.configuration = new HadoopParquetConfiguration(this.conf);
this.optionsBuilder = HadoopReadOptions.builder(this.conf, path);
}

protected Builder(InputFile file) {
Expand All @@ -225,9 +229,9 @@ protected Builder(InputFile file) {
this.path = null;
if (file instanceof HadoopInputFile) {
HadoopInputFile hadoopFile = (HadoopInputFile) file;
Configuration hadoopConf = hadoopFile.getConfiguration();
this.conf = new HadoopParquetConfiguration(hadoopConf);
optionsBuilder = HadoopReadOptions.builder(hadoopConf, hadoopFile.getPath());
this.conf = hadoopFile.getConfiguration();
this.configuration = new HadoopParquetConfiguration(this.conf);
optionsBuilder = HadoopReadOptions.builder(this.conf, hadoopFile.getPath());
} else {
optionsBuilder = ParquetReadOptions.builder(new HadoopParquetConfiguration());
}
Expand All @@ -237,19 +241,20 @@ protected Builder(InputFile file, ParquetConfiguration conf) {
this.readSupport = null;
this.file = Objects.requireNonNull(file, "file cannot be null");
this.path = null;
this.conf = conf;
this.configuration = conf;
if (file instanceof HadoopInputFile) {
this.conf = ConfigurationUtil.createHadoopConfiguration(conf);
HadoopInputFile hadoopFile = (HadoopInputFile) file;
optionsBuilder = HadoopReadOptions.builder(
ConfigurationUtil.createHadoopConfiguration(conf), hadoopFile.getPath());
optionsBuilder = HadoopReadOptions.builder(this.conf, hadoopFile.getPath());
} else {
optionsBuilder = ParquetReadOptions.builder(conf);
}
}

// when called, resets options to the defaults from conf
public Builder<T> withConf(Configuration conf) {
this.conf = new HadoopParquetConfiguration(Objects.requireNonNull(conf, "conf cannot be null"));
this.conf = Objects.requireNonNull(conf, "conf cannot be null");
this.configuration = new HadoopParquetConfiguration(this.conf);

// previous versions didn't use the builder, so may set filter before conf. this maintains
// compatibility for filter. other options are reset by a new conf.
Expand All @@ -262,7 +267,7 @@ public Builder<T> withConf(Configuration conf) {
}

public Builder<T> withConf(ParquetConfiguration conf) {
this.conf = conf;
this.configuration = conf;
this.optionsBuilder = ParquetReadOptions.builder(conf);
if (filter != null) {
optionsBuilder.withRecordFilter(filter);
Expand Down Expand Up @@ -383,7 +388,7 @@ public ParquetReader<T> build() throws IOException {
ParquetReadOptions options = optionsBuilder.withAllocator(allocator).build();

if (path != null) {
Configuration hadoopConf = ConfigurationUtil.createHadoopConfiguration(conf);
Configuration hadoopConf = ConfigurationUtil.createHadoopConfiguration(configuration);
FileSystem fs = path.getFileSystem(hadoopConf);
FileStatus stat = fs.getFileStatus(path);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
Expand All @@ -43,6 +44,33 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
private final MemoryManager memoryManager;
private final CodecFactory codecFactory;

@Deprecated
public ParquetRecordWriter(
ParquetFileWriter w,
WriteSupport<T> writeSupport,
MessageType schema,
Map<String, String> extraMetaData,
int blockSize,
int pageSize,
BytesCompressor compressor,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
WriterVersion writerVersion) {
this(
w,
writeSupport,
schema,
extraMetaData,
blockSize,
pageSize,
(BytesInputCompressor) compressor,
dictionaryPageSize,
enableDictionary,
validating,
writerVersion);
}

/**
* @param w the file to write to
* @param writeSupport the class to convert incoming records
Expand Down Expand Up @@ -81,6 +109,35 @@ public ParquetRecordWriter(
this.codecFactory = null;
}

@Deprecated
public ParquetRecordWriter(
ParquetFileWriter w,
WriteSupport<T> writeSupport,
MessageType schema,
Map<String, String> extraMetaData,
long blockSize,
int pageSize,
BytesCompressor compressor,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
WriterVersion writerVersion,
MemoryManager memoryManager) {
this(
w,
writeSupport,
schema,
extraMetaData,
blockSize,
pageSize,
(BytesInputCompressor) compressor,
dictionaryPageSize,
enableDictionary,
validating,
writerVersion,
memoryManager);
}

/**
* @param w the file to write to
* @param writeSupport the class to convert incoming records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ public interface StateVisitor<R, S> {

R visit(StringType stringType, S state);

R visit(UUIDType uuidType, S state);
default R visit(UUIDType uuidType, S state) {
throw new UnsupportedOperationException("Not implemented");
}
}

/**
Expand Down Expand Up @@ -166,7 +168,9 @@ public interface TypeVisitor {

void visit(StringType stringType);

void visit(UUIDType uuidType);
default void visit(UUIDType uuidType) {
throw new UnsupportedOperationException("Not implemented");
}
}

/**
Expand Down
Loading

0 comments on commit 23c788d

Please sign in to comment.