Skip to content

Commit

Permalink
[GH] Optimize the code struct of ColumnChunkMetaData.
Browse files Browse the repository at this point in the history
  • Loading branch information
joyCurry30 committed Oct 24, 2024
1 parent 42cf31c commit 73705ba
Show file tree
Hide file tree
Showing 29 changed files with 844 additions and 740 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Map<String, Long> getColumnSizeInBytes(Path inputFile) throws IOException
for (BlockMetaData block : pmd.getBlocks()) {
for (ColumnChunkMetaData column : block.getColumns()) {
String colName = column.getPath().toDotString();
colSizes.put(colName, column.getTotalSize() + colSizes.getOrDefault(colName, 0L));
colSizes.put(colName, column.getTotalSizeWithDecrypt() + colSizes.getOrDefault(colName, 0L));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ private void printColumnChunk(Logger console, int width, ColumnChunkMetaData col
Preconditions.checkNotNull(type);

ColumnDescriptor desc = schema.getColumnDescription(path);
long size = column.getTotalSize();
long count = column.getValueCount();
long size = column.getTotalSizeWithDecrypt();
long count = column.getValueCountWithDecrypt();
float perValue = ((float) size) / count;
CompressionCodecName codec = column.getCodec();
Set<Encoding> encodings = column.getEncodings();
EncodingStats encodingStats = column.getEncodingStats();
String encodingSummary =
encodingStats == null ? encodingsAsString(encodings, desc) : encodingStatsAsString(encodingStats);
Statistics stats = column.getStatistics();
Statistics stats = column.getStatisticsWithDecrypt();

String name = column.getPath().toDotString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {

try {
Set<T> dictSet = expandDictionary(meta);
boolean mayContainNull = (meta.getStatistics() == null
|| !meta.getStatistics().isNumNullsSet()
|| meta.getStatistics().getNumNulls() > 0);
boolean mayContainNull = (meta.getStatisticsWithDecrypt() == null
|| !meta.getStatisticsWithDecrypt().isNumNullsSet()
|| meta.getStatisticsWithDecrypt().getNumNulls() > 0);
if (dictSet != null && dictSet.size() == 1 && dictSet.contains(value) && !mayContainNull) {
return BLOCK_CANNOT_MATCH;
}
Expand Down Expand Up @@ -461,9 +461,9 @@ public <T extends Comparable<T>> Boolean visit(NotIn<T> notIn) {
return BLOCK_MIGHT_MATCH;
}

boolean mayContainNull = (meta.getStatistics() == null
|| !meta.getStatistics().isNumNullsSet()
|| meta.getStatistics().getNumNulls() > 0);
boolean mayContainNull = (meta.getStatisticsWithDecrypt() == null
|| !meta.getStatisticsWithDecrypt().isNumNullsSet()
|| meta.getStatisticsWithDecrypt().getNumNulls() > 0);
// The column may contain nulls and the values set contains no null, so the row group cannot be eliminated.
if (mayContainNull) {
return BLOCK_MIGHT_MATCH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
// is this column chunk composed entirely of nulls?
// assumes the column chunk's statistics is not empty
private boolean isAllNulls(ColumnChunkMetaData column) {
return column.getStatistics().getNumNulls() == column.getValueCount();
return column.getStatisticsWithDecrypt().getNumNulls() == column.getValueCountWithDecrypt();
}

// are there any nulls in this column chunk?
// assumes the column chunk's statistics is not empty
private boolean hasNulls(ColumnChunkMetaData column) {
return column.getStatistics().getNumNulls() > 0;
return column.getStatisticsWithDecrypt().getNumNulls() > 0;
}

@Override
Expand All @@ -116,7 +116,7 @@ public <T extends Comparable<T>> Boolean visit(Eq<T> eq) {
return BLOCK_MIGHT_MATCH;
}

Statistics<T> stats = meta.getStatistics();
Statistics<T> stats = meta.getStatisticsWithDecrypt();

if (stats.isEmpty()) {
// we have no statistics available, we cannot drop any chunks
Expand Down Expand Up @@ -165,7 +165,7 @@ public <T extends Comparable<T>> Boolean visit(In<T> in) {
return BLOCK_MIGHT_MATCH;
}

Statistics<T> stats = meta.getStatistics();
Statistics<T> stats = meta.getStatisticsWithDecrypt();

if (stats.isEmpty()) {
// we have no statistics available, we cannot drop any chunks
Expand Down Expand Up @@ -233,7 +233,7 @@ public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {
return BLOCK_MIGHT_MATCH;
}

Statistics<T> stats = meta.getStatistics();
Statistics<T> stats = meta.getStatisticsWithDecrypt();

if (stats.isEmpty()) {
// we have no statistics available, we cannot drop any chunks
Expand Down Expand Up @@ -273,7 +273,7 @@ public <T extends Comparable<T>> Boolean visit(Lt<T> lt) {
return BLOCK_CANNOT_MATCH;
}

Statistics<T> stats = meta.getStatistics();
Statistics<T> stats = meta.getStatisticsWithDecrypt();

if (stats.isEmpty()) {
// we have no statistics available, we cannot drop any chunks
Expand Down Expand Up @@ -309,7 +309,7 @@ public <T extends Comparable<T>> Boolean visit(LtEq<T> ltEq) {
return BLOCK_CANNOT_MATCH;
}

Statistics<T> stats = meta.getStatistics();
Statistics<T> stats = meta.getStatisticsWithDecrypt();

if (stats.isEmpty()) {
// we have no statistics available, we cannot drop any chunks
Expand Down Expand Up @@ -345,7 +345,7 @@ public <T extends Comparable<T>> Boolean visit(Gt<T> gt) {
return BLOCK_CANNOT_MATCH;
}

Statistics<T> stats = meta.getStatistics();
Statistics<T> stats = meta.getStatisticsWithDecrypt();

if (stats.isEmpty()) {
// we have no statistics available, we cannot drop any chunks
Expand Down Expand Up @@ -381,7 +381,7 @@ public <T extends Comparable<T>> Boolean visit(GtEq<T> gtEq) {
return BLOCK_CANNOT_MATCH;
}

Statistics<T> stats = meta.getStatistics();
Statistics<T> stats = meta.getStatisticsWithDecrypt();

if (stats.isEmpty()) {
// we have no statistics available, we cannot drop any chunks
Expand Down Expand Up @@ -446,7 +446,7 @@ private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean vis
}
}

Statistics<T> stats = columnChunk.getStatistics();
Statistics<T> stats = columnChunk.getStatisticsWithDecrypt();

if (stats.isEmpty()) {
// we have no statistics available, we cannot drop any chunks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,14 @@ private void addRowGroup(
toFormatEncodings(columnMetaData.getEncodings()),
columnMetaData.getPath().toList(),
toFormatCodec(columnMetaData.getCodec()),
columnMetaData.getValueCount(),
columnMetaData.getTotalUncompressedSize(),
columnMetaData.getTotalSize(),
columnMetaData.getFirstDataPageOffset());
columnMetaData.getValueCountWithDecrypt(),
columnMetaData.getTotalUncompressedSizeWithDecrypt(),
columnMetaData.getTotalSizeWithDecrypt(),
columnMetaData.getFirstDataPageOffsetWithDecrypt());
if ((columnMetaData.getEncodingStats() != null
&& columnMetaData.getEncodingStats().hasDictionaryPages())
|| columnMetaData.hasDictionaryPage()) {
metaData.setDictionary_page_offset(columnMetaData.getDictionaryPageOffset());
metaData.setDictionary_page_offset(columnMetaData.getDictionaryPageOffsetWithDecrypt());
}
long bloomFilterOffset = columnMetaData.getBloomFilterOffset();
if (bloomFilterOffset >= 0) {
Expand All @@ -570,17 +570,17 @@ private void addRowGroup(
if (bloomFilterLength >= 0) {
metaData.setBloom_filter_length(bloomFilterLength);
}
if (columnMetaData.getStatistics() != null
&& !columnMetaData.getStatistics().isEmpty()) {
if (columnMetaData.getStatisticsWithDecrypt() != null
&& !columnMetaData.getStatisticsWithDecrypt().isEmpty()) {
metaData.setStatistics(
toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
toParquetStatistics(columnMetaData.getStatisticsWithDecrypt(), this.statisticsTruncateLength));
}
if (columnMetaData.getEncodingStats() != null) {
metaData.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats()));
}
if (columnMetaData.getSizeStatistics() != null
&& columnMetaData.getSizeStatistics().isValid()) {
metaData.setSize_statistics(toParquetSizeStatistics(columnMetaData.getSizeStatistics()));
if (columnMetaData.getSizeStatisticsWithDecrypt() != null
&& columnMetaData.getSizeStatisticsWithDecrypt().isValid()) {
metaData.setSize_statistics(toParquetSizeStatistics(columnMetaData.getSizeStatisticsWithDecrypt()));
}

if (!encryptMetaData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public static Offsets getOffsets(SeekableInputStream input, ColumnChunkMetaData
long dictionaryPageOffset;
if (chunk.hasDictionaryPage()) {
long dictionaryPageSize;
if (chunk.getDictionaryPageOffset() == 0
|| chunk.getFirstDataPageOffset() <= chunk.getDictionaryPageOffset()) {
if (chunk.getDictionaryPageOffsetWithDecrypt() == 0
|| chunk.getFirstDataPageOffsetWithDecrypt() <= chunk.getDictionaryPageOffsetWithDecrypt()) {
/*
* The offsets might not contain the proper values (so we need to read the dictionary page header):
* - The dictionaryPageOffset might not be set; in this case 0 is returned
Expand All @@ -57,7 +57,8 @@ public static Offsets getOffsets(SeekableInputStream input, ColumnChunkMetaData
*/
dictionaryPageSize = readDictionaryPageSize(input, chunk);
} else {
dictionaryPageSize = chunk.getFirstDataPageOffset() - chunk.getDictionaryPageOffset();
dictionaryPageSize =
chunk.getFirstDataPageOffsetWithDecrypt() - chunk.getDictionaryPageOffsetWithDecrypt();
}
firstDataPageOffset = newChunkStart + dictionaryPageSize;
dictionaryPageOffset = newChunkStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,14 +1086,15 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
ColumnPath pathKey = mc.getPath();
ColumnDescriptor columnDescriptor = paths.get(pathKey);
if (columnDescriptor != null) {
BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
BenchmarkCounter.incrementTotalBytes(mc.getTotalSizeWithDecrypt());
long startingPos = mc.getStartingPos();
// first part or not consecutive => new list
if (currentParts == null || currentParts.endPos() != startingPos) {
currentParts = new ConsecutivePartList(startingPos);
allParts.add(currentParts);
}
currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize()));
currentParts.addChunk(
new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSizeWithDecrypt()));
}
}
// actually read all the chunks
Expand Down Expand Up @@ -1955,13 +1956,13 @@ public ColumnChunkPageReader readAllPages(
break;
}
}
if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCountWithDecrypt()) {
// Would be nice to have a CorruptParquetFileException or something as a subclass?
throw new IOException(
"Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " + getPath()
+ " offset " + descriptor.metadata.getFirstDataPageOffset() + " but got "
+ valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+ " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
throw new IOException("Expected " + descriptor.metadata.getValueCountWithDecrypt()
+ " values in column chunk at " + getPath()
+ " offset " + descriptor.metadata.getFirstDataPageOffsetWithDecrypt() + " but got "
+ valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+ " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
}
BytesInputDecompressor decompressor =
options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
Expand All @@ -1980,7 +1981,7 @@ public ColumnChunkPageReader readAllPages(

private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) {
return offsetIndex == null
? valuesCountReadSoFar < descriptor.metadata.getValueCount()
? valuesCountReadSoFar < descriptor.metadata.getValueCountWithDecrypt()
: dataPageCountReadSoFar < offsetIndex.getPageCount();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1524,7 +1524,7 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, boo
// no previous chunk included, start at this chunk's starting pos
start = chunk.getStartingPos();
}
length += chunk.getTotalSize();
length += chunk.getTotalSizeWithDecrypt();

if ((i + 1) == columnsInOrder.size() || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) {
// not contiguous. do the copy now.
Expand All @@ -1546,14 +1546,14 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, boo
chunk.getCodec(),
chunk.getEncodingStats(),
chunk.getEncodings(),
chunk.getStatistics(),
chunk.getStatisticsWithDecrypt(),
offsets.firstDataPageOffset,
offsets.dictionaryPageOffset,
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
chunk.getValueCountWithDecrypt(),
chunk.getTotalSizeWithDecrypt(),
chunk.getTotalUncompressedSizeWithDecrypt()));

blockUncompressedSize += chunk.getTotalUncompressedSize();
blockUncompressedSize += chunk.getTotalUncompressedSizeWithDecrypt();
}

currentBlock.setTotalByteSize(blockUncompressedSize);
Expand All @@ -1579,7 +1579,7 @@ public void appendColumnChunk(
OffsetIndex offsetIndex)
throws IOException {
long start = chunk.getStartingPos();
long length = chunk.getTotalSize();
long length = chunk.getTotalSizeWithDecrypt();
long newChunkStart = out.getPos();

if (offsetIndex != null && newChunkStart != start) {
Expand All @@ -1600,14 +1600,14 @@ public void appendColumnChunk(
chunk.getCodec(),
chunk.getEncodingStats(),
chunk.getEncodings(),
chunk.getStatistics(),
chunk.getStatisticsWithDecrypt(),
offsets.firstDataPageOffset,
offsets.dictionaryPageOffset,
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
chunk.getValueCountWithDecrypt(),
chunk.getTotalSizeWithDecrypt(),
chunk.getTotalUncompressedSizeWithDecrypt()));

currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize());
currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSizeWithDecrypt());
}

// Buffers for the copy function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ public ParquetInputSplit getParquetInputSplit(
List<ColumnChunkMetaData> columns = block.getColumns();
for (ColumnChunkMetaData column : columns) {
if (requested.containsPath(column.getPath().toArray())) {
length += column.getTotalSize();
length += column.getTotalSizeWithDecrypt();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private static long end(List<BlockMetaData> blocks, String requestedSchema) {
List<ColumnChunkMetaData> columns = block.getColumns();
for (ColumnChunkMetaData column : columns) {
if (requested.containsPath(column.getPath().toArray())) {
length += column.getTotalSize();
length += column.getTotalSizeWithDecrypt();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ private static void add(ParquetMetadata footer) {
schema.getColumnDescription(columnMetaData.getPath().toArray());
add(
desc,
columnMetaData.getValueCount(),
columnMetaData.getTotalSize(),
columnMetaData.getTotalUncompressedSize(),
columnMetaData.getValueCountWithDecrypt(),
columnMetaData.getTotalSizeWithDecrypt(),
columnMetaData.getTotalUncompressedSizeWithDecrypt(),
columnMetaData.getEncodings(),
columnMetaData.getStatistics());
columnMetaData.getStatisticsWithDecrypt());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public String toString() {
public long getCompressedSize() {
long totalSize = 0;
for (ColumnChunkMetaData col : getColumns()) {
totalSize += col.getTotalSize();
totalSize += col.getTotalSizeWithDecrypt();
}
return totalSize;
}
Expand Down
Loading

0 comments on commit 73705ba

Please sign in to comment.