Skip to content

Commit

Permalink
PARQUET-2351: Set options with Configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
amousavigourabi committed Sep 27, 2023
1 parent 3d03770 commit f1a074a
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public static void setMaxPaddingSize(Configuration conf, int maxPaddingSize) {
conf.setInt(MAX_PADDING_BYTES, maxPaddingSize);
}

private static int getMaxPaddingSize(Configuration conf) {
public static int getMaxPaddingSize(Configuration conf) {
return conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT);
}

Expand All @@ -353,7 +353,7 @@ public static void setColumnIndexTruncateLength(Configuration conf, int length)
conf.setInt(COLUMN_INDEX_TRUNCATE_LENGTH, length);
}

private static int getColumnIndexTruncateLength(Configuration conf) {
public static int getColumnIndexTruncateLength(Configuration conf) {
return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
}

Expand All @@ -365,7 +365,7 @@ private static void setStatisticsTruncateLength(Configuration conf, int length)
conf.setInt(STATISTICS_TRUNCATE_LENGTH, length);
}

private static int getStatisticsTruncateLength(Configuration conf) {
public static int getStatisticsTruncateLength(Configuration conf) {
return conf.getInt(STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
}

Expand All @@ -377,7 +377,7 @@ public static void setPageRowCountLimit(Configuration conf, int rowCount) {
conf.setInt(PAGE_ROW_COUNT_LIMIT, rowCount);
}

private static int getPageRowCountLimit(Configuration conf) {
public static int getPageRowCountLimit(Configuration conf) {
return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,29 @@ public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
private long rowGroupSize = DEFAULT_BLOCK_SIZE;
private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED;
private ParquetProperties.Builder encodingPropsBuilder =
private final ParquetProperties.Builder encodingPropsBuilder =
ParquetProperties.builder();

private boolean isPageSizeSet = false;
private boolean isBloomFilterCandidateNumberSet = false;
private boolean isPageRowCountLimitSet = false;
private boolean isDictionaryPageSizeSet = false;
private boolean isDictionaryEncodingSet = false;
private boolean isWriterVersionSet = false;
private boolean isPageWriteChecksumSet = false;
private boolean isMaxBloomFilterBytesSet = false;
private boolean isBloomFilterNDVSet = false;
private boolean isBloomFilterFPPSet = false;
private boolean isAdaptiveBloomFilterSet = false;
private boolean isBloomFilterEnabledSet = false;
private boolean isMinRowCountForPageSizeCheckSet = false;
private boolean isColumnIndexTruncateLengthSet = false;
private boolean isStatisticsTruncateLengthSet = false;
private boolean isMaxRowCountForPageSizeCheckSet = false;
private boolean isMaxPaddingSizeSet = false;
private boolean isCodecNameSet = false;
private boolean isEnableValidationSet = false;

protected Builder(Path path) {
this.path = path;
}
Expand Down Expand Up @@ -388,6 +408,55 @@ protected Builder(OutputFile path) {
*/
public SELF withConf(Configuration conf) {
this.conf = conf;
if (!isPageSizeSet)
encodingPropsBuilder.withPageSize(ParquetOutputFormat.getPageSize(conf));
if (!isPageRowCountLimitSet)
encodingPropsBuilder.withPageRowCountLimit(ParquetOutputFormat.getPageRowCountLimit(conf));
if (!isDictionaryPageSizeSet)
encodingPropsBuilder.withDictionaryPageSize(ParquetOutputFormat.getDictionaryPageSize(conf));
if (!isDictionaryEncodingSet)
encodingPropsBuilder.withDictionaryEncoding(ParquetOutputFormat.getEnableDictionary(conf));
if (!isWriterVersionSet)
encodingPropsBuilder.withWriterVersion(ParquetOutputFormat.getWriterVersion(conf));
if (!isPageWriteChecksumSet)
encodingPropsBuilder.withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf));
if (!isMaxBloomFilterBytesSet)
encodingPropsBuilder.withMaxBloomFilterBytes(ParquetOutputFormat.getBloomFilterMaxBytes(conf));
if (!isAdaptiveBloomFilterSet)
encodingPropsBuilder.withAdaptiveBloomFilterEnabled(ParquetOutputFormat.getAdaptiveBloomFilterEnabled(conf));
if (!isBloomFilterEnabledSet)
encodingPropsBuilder.withBloomFilterEnabled(ParquetOutputFormat.getBloomFilterEnabled(conf));
if (!isMinRowCountForPageSizeCheckSet)
encodingPropsBuilder.withMinRowCountForPageSizeCheck(ParquetOutputFormat.getMinRowCountForPageSizeCheck(conf));
if (!isMaxRowCountForPageSizeCheckSet)
encodingPropsBuilder.withMaxRowCountForPageSizeCheck(ParquetOutputFormat.getMaxRowCountForPageSizeCheck(conf));
if (!isColumnIndexTruncateLengthSet)
encodingPropsBuilder.withColumnIndexTruncateLength(ParquetOutputFormat.getColumnIndexTruncateLength(conf));
if (!isStatisticsTruncateLengthSet)
encodingPropsBuilder.withStatisticsTruncateLength(ParquetOutputFormat.getStatisticsTruncateLength(conf));
if (!isMaxPaddingSizeSet)
maxPaddingSize = ParquetOutputFormat.getMaxPaddingSize(conf);
if (!isCodecNameSet)
codecName = ParquetOutputFormat.getCompression(conf);
if (!isEnableValidationSet)
enableValidation = ParquetOutputFormat.getValidation(conf);
ColumnConfigParser cc = new ColumnConfigParser();
if (!isDictionaryEncodingSet)
cc.withColumnConfig(ParquetOutputFormat.ENABLE_DICTIONARY, key -> conf.getBoolean(key, ParquetOutputFormat.getEnableDictionary(conf)), encodingPropsBuilder::withDictionaryEncoding);
if (!isBloomFilterEnabledSet)
cc.withColumnConfig(ParquetOutputFormat.BLOOM_FILTER_ENABLED, key -> conf.getBoolean(key, ParquetOutputFormat.getBloomFilterEnabled(conf)),
encodingPropsBuilder::withBloomFilterEnabled);
if (!isBloomFilterNDVSet)
cc.withColumnConfig(ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, -1L), encodingPropsBuilder::withBloomFilterNDV);
if (!isBloomFilterFPPSet)
cc.withColumnConfig(ParquetOutputFormat.BLOOM_FILTER_FPP, key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),
encodingPropsBuilder::withBloomFilterFPP);
if (!isBloomFilterCandidateNumberSet)
cc.withColumnConfig(
ParquetOutputFormat.BLOOM_FILTER_CANDIDATES_NUMBER,
key -> conf.getInt(key, ParquetProperties.DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER),
encodingPropsBuilder::withBloomFilterCandidatesNumber);
cc.parseConfig(conf);
return self();
}

Expand All @@ -411,6 +480,7 @@ public SELF withWriteMode(ParquetFileWriter.Mode mode) {
* @return this builder for method chaining.
*/
public SELF withCompressionCodec(CompressionCodecName codecName) {
isCodecNameSet = true;
this.codecName = codecName;
return self();
}
Expand Down Expand Up @@ -457,6 +527,7 @@ public SELF withRowGroupSize(long rowGroupSize) {
* @return this builder for method chaining.
*/
public SELF withPageSize(int pageSize) {
isPageSizeSet = true;
encodingPropsBuilder.withPageSize(pageSize);
return self();
}
Expand All @@ -468,6 +539,7 @@ public SELF withPageSize(int pageSize) {
* @return this builder for method chaining
*/
public SELF withPageRowCountLimit(int rowCount) {
isPageRowCountLimitSet = true;
encodingPropsBuilder.withPageRowCountLimit(rowCount);
return self();
}
Expand All @@ -480,6 +552,7 @@ public SELF withPageRowCountLimit(int rowCount) {
* @return this builder for method chaining.
*/
public SELF withDictionaryPageSize(int dictionaryPageSize) {
isDictionaryPageSizeSet = true;
encodingPropsBuilder.withDictionaryPageSize(dictionaryPageSize);
return self();
}
Expand All @@ -493,6 +566,7 @@ public SELF withDictionaryPageSize(int dictionaryPageSize) {
* @return this builder for method chaining.
*/
public SELF withMaxPaddingSize(int maxPaddingSize) {
isMaxPaddingSizeSet = true;
this.maxPaddingSize = maxPaddingSize;
return self();
}
Expand All @@ -503,6 +577,7 @@ public SELF withMaxPaddingSize(int maxPaddingSize) {
* @return this builder for method chaining.
*/
public SELF enableDictionaryEncoding() {
isDictionaryEncodingSet = true;
encodingPropsBuilder.withDictionaryEncoding(true);
return self();
}
Expand All @@ -514,6 +589,7 @@ public SELF enableDictionaryEncoding() {
* @return this builder for method chaining.
*/
public SELF withDictionaryEncoding(boolean enableDictionary) {
isDictionaryEncodingSet = true;
encodingPropsBuilder.withDictionaryEncoding(enableDictionary);
return self();
}
Expand All @@ -531,6 +607,7 @@ public SELF withByteStreamSplitEncoding(boolean enableByteStreamSplit) {
* @return this builder for method chaining.
*/
public SELF withDictionaryEncoding(String columnPath, boolean enableDictionary) {
isDictionaryEncodingSet = true;
encodingPropsBuilder.withDictionaryEncoding(columnPath, enableDictionary);
return self();
}
Expand All @@ -541,6 +618,7 @@ public SELF withDictionaryEncoding(String columnPath, boolean enableDictionary)
* @return this builder for method chaining.
*/
public SELF enableValidation() {
isEnableValidationSet = true;
this.enableValidation = true;
return self();
}
Expand All @@ -552,6 +630,7 @@ public SELF enableValidation() {
* @return this builder for method chaining.
*/
public SELF withValidation(boolean enableValidation) {
isEnableValidationSet = true;
this.enableValidation = enableValidation;
return self();
}
Expand All @@ -564,6 +643,7 @@ public SELF withValidation(boolean enableValidation) {
* @return this builder for method chaining.
*/
public SELF withWriterVersion(WriterVersion version) {
isWriterVersionSet = true;
encodingPropsBuilder.withWriterVersion(version);
return self();
}
Expand All @@ -574,6 +654,7 @@ public SELF withWriterVersion(WriterVersion version) {
* @return this builder for method chaining.
*/
public SELF enablePageWriteChecksum() {
isPageWriteChecksumSet = true;
encodingPropsBuilder.withPageWriteChecksumEnabled(true);
return self();
}
Expand All @@ -585,6 +666,7 @@ public SELF enablePageWriteChecksum() {
* @return this builder for method chaining.
*/
public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) {
isPageWriteChecksumSet = true;
encodingPropsBuilder.withPageWriteChecksumEnabled(enablePageWriteChecksum);
return self();
}
Expand All @@ -596,6 +678,7 @@ public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) {
* @return this builder for method chaining
*/
public SELF withMaxBloomFilterBytes(int maxBloomFilterBytes) {
isMaxBloomFilterBytesSet = true;
encodingPropsBuilder.withMaxBloomFilterBytes(maxBloomFilterBytes);
return self();
}
Expand All @@ -609,12 +692,14 @@ public SELF withMaxBloomFilterBytes(int maxBloomFilterBytes) {
* @return this builder for method chaining.
*/
public SELF withBloomFilterNDV(String columnPath, long ndv) {
isBloomFilterNDVSet = true;
encodingPropsBuilder.withBloomFilterNDV(columnPath, ndv);

return self();
}

public SELF withBloomFilterFPP(String columnPath, double fpp) {
isBloomFilterFPPSet = true;
encodingPropsBuilder.withBloomFilterFPP(columnPath, fpp);
return self();
}
Expand All @@ -626,6 +711,7 @@ public SELF withBloomFilterFPP(String columnPath, double fpp) {
* @param enabled whether to write bloom filter for the column
*/
public SELF withAdaptiveBloomFilterEnabled(boolean enabled) {
isAdaptiveBloomFilterSet = true;
encodingPropsBuilder.withAdaptiveBloomFilterEnabled(enabled);
return self();
}
Expand All @@ -637,6 +723,7 @@ public SELF withAdaptiveBloomFilterEnabled(boolean enabled) {
* @param number the number of candidate
*/
public SELF withBloomFilterCandidateNumber(String columnPath, int number) {
isBloomFilterCandidateNumberSet = true;
encodingPropsBuilder.withBloomFilterCandidatesNumber(columnPath, number);
return self();
}
Expand All @@ -648,6 +735,7 @@ public SELF withBloomFilterCandidateNumber(String columnPath, int number) {
* @return this builder for method chaining
*/
public SELF withBloomFilterEnabled(boolean enabled) {
isBloomFilterEnabledSet = true;
encodingPropsBuilder.withBloomFilterEnabled(enabled);
return self();
}
Expand All @@ -661,6 +749,7 @@ public SELF withBloomFilterEnabled(boolean enabled) {
* @return this builder for method chaining
*/
public SELF withBloomFilterEnabled(String columnPath, boolean enabled) {
isBloomFilterEnabledSet = true;
encodingPropsBuilder.withBloomFilterEnabled(columnPath, enabled);
return self();
}
Expand All @@ -672,6 +761,7 @@ public SELF withBloomFilterEnabled(String columnPath, boolean enabled) {
* @return this builder for method chaining
*/
public SELF withMinRowCountForPageSizeCheck(int min) {
isMinRowCountForPageSizeCheckSet = true;
encodingPropsBuilder.withMinRowCountForPageSizeCheck(min);
return self();
}
Expand All @@ -683,6 +773,7 @@ public SELF withMinRowCountForPageSizeCheck(int min) {
* @return this builder for method chaining
*/
public SELF withMaxRowCountForPageSizeCheck(int max) {
isMaxRowCountForPageSizeCheckSet = true;
encodingPropsBuilder.withMaxRowCountForPageSizeCheck(max);
return self();
}
Expand All @@ -694,6 +785,7 @@ public SELF withMaxRowCountForPageSizeCheck(int max) {
* @return this builder for method chaining
*/
public SELF withColumnIndexTruncateLength(int length) {
isColumnIndexTruncateLengthSet = true;
encodingPropsBuilder.withColumnIndexTruncateLength(length);
return self();
}
Expand All @@ -705,6 +797,7 @@ public SELF withColumnIndexTruncateLength(int length) {
* @return this builder for method chaining
*/
public SELF withStatisticsTruncateLength(int length) {
isStatisticsTruncateLengthSet = true;
encodingPropsBuilder.withStatisticsTruncateLength(length);
return self();
}
Expand Down
Loading

0 comments on commit f1a074a

Please sign in to comment.