From a4b253ef9f210630f771f44cc391e47c6dc4ddc4 Mon Sep 17 00:00:00 2001 From: amousavigourabi <28668597+amousavigourabi@users.noreply.github.com> Date: Mon, 16 Oct 2023 17:19:01 +0200 Subject: [PATCH] PARQUET-2351: Set options with Configuration --- .../parquet/hadoop/ParquetOutputFormat.java | 382 +++++++++++++++++- .../apache/parquet/hadoop/ParquetWriter.java | 82 +++- .../dictionarylevel/DictionaryFilterTest.java | 35 +- 3 files changed, 478 insertions(+), 21 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index dc23802cf8..24333bb29a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -52,11 +52,11 @@ /** * OutputFormat to write to a Parquet file - * + *
* It requires a {@link WriteSupport} to convert the actual records to the underlying format. * It requires the schema of the incoming records. (provided by the write support) * It allows storing extra metadata in the footer (for example: for schema compatibility purpose when converting from a different schema language). - * + *
* The format configuration settings in the job configuration: *
* # The block size is the size of a row group being buffered in memory @@ -106,7 +106,7 @@ public class ParquetOutputFormatextends FileOutputFormat { private static final Logger LOG = LoggerFactory.getLogger(ParquetOutputFormat.class); - public static enum JobSummaryLevel { + public enum JobSummaryLevel { /** * Write no summary files */ @@ -130,7 +130,7 @@ public static enum JobSummaryLevel { public static final String ENABLE_JOB_SUMMARY = "parquet.enable.summary-metadata"; /** - * Must be one of the values in {@link JobSummaryLevel} (case insensitive) + * Must be one of the values in {@link JobSummaryLevel} (case-insensitive) */ public static final String JOB_SUMMARY_LEVEL = "parquet.summary.metadata.level"; public static final String BLOCK_SIZE = "parquet.block.size"; @@ -159,6 +159,12 @@ public static enum JobSummaryLevel { public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; + /** + * Get the job summary level from a {@link Configuration}. + * + * @param conf the configuration to examine + * @return the job summary level set in the configuration + */ public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); String deprecatedFlag = conf.get(ENABLE_JOB_SUMMARY); @@ -176,219 +182,513 @@ public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { } if (deprecatedFlag != null) { - return Boolean.valueOf(deprecatedFlag) ? JobSummaryLevel.ALL : JobSummaryLevel.NONE; + return Boolean.parseBoolean(deprecatedFlag) ? JobSummaryLevel.ALL : JobSummaryLevel.NONE; } return JobSummaryLevel.ALL; } + /** + * Set the write support class in the {@link Job}s {@link Configuration}. + * + * @param job the job to set the write support class for + * @param writeSupportClass the write support class to set + */ public static void setWriteSupportClass(Job job, Class> writeSupportClass) { getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName()); } + /** + * Set the write support class in the {@link JobConf}. + * + * @param job the job configuration to set the write support class for + * @param writeSupportClass the write support class to set + */ public static void setWriteSupportClass(JobConf job, Class> writeSupportClass) { job.set(WRITE_SUPPORT_CLASS, writeSupportClass.getName()); } + /** + * Gets the write support class from a {@link Configuration}. + * + * @param configuration the configuration to get the write support class for + */ public static Class> getWriteSupportClass(Configuration configuration) { final String className = configuration.get(WRITE_SUPPORT_CLASS); if (className == null) { return null; } - final Class> writeSupportClass = ConfigurationUtil.getClassFromConfig(configuration, WRITE_SUPPORT_CLASS, WriteSupport.class); - return writeSupportClass; + return ConfigurationUtil.getClassFromConfig(configuration, WRITE_SUPPORT_CLASS, WriteSupport.class); } + /** + * Sets the block size property in a {@link Job}'s {@link Configuration}. + * + * @param job the job to update the configuration of + * @param blockSize the value to set the block size to + */ public static void setBlockSize(Job job, int blockSize) { getConfiguration(job).setInt(BLOCK_SIZE, blockSize); } + /** + * Sets the page size property in a {@link Job}'s {@link Configuration}. + * + * @param job the job to update the configuration of + * @param pageSize the value to set the page size to + */ public static void setPageSize(Job job, int pageSize) { getConfiguration(job).setInt(PAGE_SIZE, pageSize); } + /** + * Sets the dictionary page size in a {@link Job}'s {@link Configuration}. + * + * @param job the job to update the configuration of + * @param pageSize the value to set the dictionary page size to + */ public static void setDictionaryPageSize(Job job, int pageSize) { getConfiguration(job).setInt(DICTIONARY_PAGE_SIZE, pageSize); } + /** + * Sets the compression codec name in a {@link Job}'s {@link Configuration}. + * + * @param job the job to update the configuration of + * @param compression the value to set the compression codec to + */ public static void setCompression(Job job, CompressionCodecName compression) { getConfiguration(job).set(COMPRESSION, compression.name()); } + /** + * Sets the enable dictionary property in a {@link Job}'s {@link Configuration}. + * + * @param job the job to update the configuration of + * @param enableDictionary the value to set the property to + */ public static void setEnableDictionary(Job job, boolean enableDictionary) { getConfiguration(job).setBoolean(ENABLE_DICTIONARY, enableDictionary); } + /** + * Check whether dictionary is enabled in a {@link JobContext}. + * + * @param jobContext the job context to examine + * @return whether dictionary is enabled in a job context + */ public static boolean getEnableDictionary(JobContext jobContext) { return getEnableDictionary(getConfiguration(jobContext)); } + /** + * Get the bloom filter max bytes property from a {@link Configuration}. + * + * @param conf the configuration to examine + * @return the value of the bloom filter max bytes property + */ public static int getBloomFilterMaxBytes(Configuration conf) { return conf.getInt(BLOOM_FILTER_MAX_BYTES, ParquetProperties.DEFAULT_MAX_BLOOM_FILTER_BYTES); } + /** + * Check whether bloom filter is enabled in a {@link Configuration}. + * + * @param conf the configuration to examine + * @return whether bloom filter is enabled + */ public static boolean getBloomFilterEnabled(Configuration conf) { return conf.getBoolean(BLOOM_FILTER_ENABLED, DEFAULT_BLOOM_FILTER_ENABLED); } + /** + * Check whether adaptive bloom filter is enabled in a {@link Configuration}. + * + * @param conf the configuration to examine + * @return whether adaptive bloom filter is enabled + */ public static boolean getAdaptiveBloomFilterEnabled(Configuration conf) { return conf.getBoolean(ADAPTIVE_BLOOM_FILTER_ENABLED, DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED); } + /** + * Get the block size property from a {@link JobContext}. + * + * @param jobContext the job context to examine + * @return the block size property of the job context's {@link Configuration} + */ public static int getBlockSize(JobContext jobContext) { return getBlockSize(getConfiguration(jobContext)); } + /** + * Get the page size property from a {@link JobContext}. + * + * @param jobContext the job context to examine + * @return the page size property of the job context's {@link Configuration} + */ public static int getPageSize(JobContext jobContext) { return getPageSize(getConfiguration(jobContext)); } + /** + * Get the dictionary page property from a {@link JobContext}. + * + * @param jobContext the job context to examine + * @return the dictionary page size property of the job context's {@link Configuration} + */ public static int getDictionaryPageSize(JobContext jobContext) { return getDictionaryPageSize(getConfiguration(jobContext)); } + /** + * Get the Parquet compression from a {@link JobContext}. + * + * @param jobContext the job context to examine + * @return the Parquet compression set in the job context's {@link Configuration} + */ public static CompressionCodecName getCompression(JobContext jobContext) { return getCompression(getConfiguration(jobContext)); } + /** + * Check whether the Parquet compression is set for a {@link JobContext}. + * + * @param jobContext the job context to examine + * @return whether the Parquet compression of the job context's {@link Configuration} is set + */ public static boolean isCompressionSet(JobContext jobContext) { return isCompressionSet(getConfiguration(jobContext)); } + /** + * Sets the validation property in a {@link JobContext}. + * + * @param jobContext the context to update + * @param validating the value to set the property to + */ public static void setValidation(JobContext jobContext, boolean validating) { setValidation(getConfiguration(jobContext), validating); } + /** + * Get the validation property from a {@link JobContext}. + * + * @param jobContext the job context to examine + * @return the validation property of the job context's {@link Configuration} + */ public static boolean getValidation(JobContext jobContext) { return getValidation(getConfiguration(jobContext)); } + /** + * Get the enable dictionary property from a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return the enable dictionary property of the configuration + */ public static boolean getEnableDictionary(Configuration configuration) { return configuration.getBoolean( ENABLE_DICTIONARY, ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED); } + /** + * Get the min row count for page size check property from a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return the min row count for page size check property of the configuration + */ public static int getMinRowCountForPageSizeCheck(Configuration configuration) { return configuration.getInt(MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK); } + /** + * Get the max row count for page size check property from a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return the max row count for page size check property of the configuration + */ public static int getMaxRowCountForPageSizeCheck(Configuration configuration) { return configuration.getInt(MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK, ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK); } + /** + * Get the value count threshold from a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return the value count threshold property of the configuration + */ public static int getValueCountThreshold(Configuration configuration) { return configuration.getInt(PAGE_VALUE_COUNT_THRESHOLD, ParquetProperties.DEFAULT_PAGE_VALUE_COUNT_THRESHOLD); } + /** + * Get the estimate page size check from a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return the estimate page size check property of the configuration + */ public static boolean getEstimatePageSizeCheck(Configuration configuration) { return configuration.getBoolean(ESTIMATE_PAGE_SIZE_CHECK, ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK); } + /** + * Get the block size from a {@link Configuration} as an integer. + * + * @deprecated use {@link ParquetOutputFormat#getLongBlockSize(Configuration)} instead + * + * @param configuration the configuration to examine + * @return the block size property of the configuration as an integer + */ @Deprecated public static int getBlockSize(Configuration configuration) { return configuration.getInt(BLOCK_SIZE, DEFAULT_BLOCK_SIZE); } + /** + * Get the block size from a {@link Configuration} as a long. + * + * @param configuration the configuration to examine + * @return the block size property of the configuration as a long + */ public static long getLongBlockSize(Configuration configuration) { return configuration.getLong(BLOCK_SIZE, DEFAULT_BLOCK_SIZE); } + /** + * Get the page size from a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return the page size property of the configuration + */ public static int getPageSize(Configuration configuration) { return configuration.getInt(PAGE_SIZE, ParquetProperties.DEFAULT_PAGE_SIZE); } + /** + * Get the dictionary page size from a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return the dictionary page size property of the configuration + */ public static int getDictionaryPageSize(Configuration configuration) { return configuration.getInt( DICTIONARY_PAGE_SIZE, ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE); } + /** + * Get the writer version from a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return the writer version set + */ public static WriterVersion getWriterVersion(Configuration configuration) { String writerVersion = configuration.get( WRITER_VERSION, ParquetProperties.DEFAULT_WRITER_VERSION.toString()); return WriterVersion.fromString(writerVersion); } + /** + * Get the compression codec from a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return the Parquet compression codec set + */ public static CompressionCodecName getCompression(Configuration configuration) { return CodecConfig.getParquetCompressionCodec(configuration); } + /** + * Checks whether the Parquet compression is set in a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return whether the Parquet compression is set in the configuration + */ public static boolean isCompressionSet(Configuration configuration) { return CodecConfig.isParquetCompressionSet(configuration); } + /** + * Sets the validation property in a {@link Configuration}. + * + * @param configuration the configuration to update + * @param validating the value to set the property to + */ public static void setValidation(Configuration configuration, boolean validating) { configuration.setBoolean(VALIDATION, validating); } + /** + * Get the validation property from a {@link Configuration}. + * + * @param configuration the configuration to examine + * @return the value of the validation property + */ public static boolean getValidation(Configuration configuration) { return configuration.getBoolean(VALIDATION, false); } + /** + * Get the codec property from a {@link TaskAttemptContext}. + * + * @param taskAttemptContext the task attempt context to examine + * @return the compression codec name from the task attempt context's codec configuration + */ private CompressionCodecName getCodec(TaskAttemptContext taskAttemptContext) { return CodecConfig.from(taskAttemptContext).getCodec(); } + /** + * Sets the max padding size property in a {@link JobContext}. + * + * @param jobContext the context to update + * @param maxPaddingSize the value to set the property to + */ public static void setMaxPaddingSize(JobContext jobContext, int maxPaddingSize) { setMaxPaddingSize(getConfiguration(jobContext), maxPaddingSize); } + /** + * Sets the max padding size property in a {@link Configuration}. + * + * @param conf the configuration to update + * @param maxPaddingSize the value to set the property to + */ public static void setMaxPaddingSize(Configuration conf, int maxPaddingSize) { conf.setInt(MAX_PADDING_BYTES, maxPaddingSize); } - private static int getMaxPaddingSize(Configuration conf) { + /** + * Get the max padding size property from a {@link Configuration}. + * + * @param conf the configuration to examine + * @return the value of the max padding size property + */ + public static int getMaxPaddingSize(Configuration conf) { return conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT); } + /** + * Sets the column index truncate length property in a {@link JobContext}. + * + * @param jobContext the context to update + * @param length the value to set the length to + */ public static void setColumnIndexTruncateLength(JobContext jobContext, int length) { setColumnIndexTruncateLength(getConfiguration(jobContext), length); } + /** + * Sets the column index truncate length property in a {@link Configuration}. + * + * @param conf the configuration to update + * @param length the value to set the length to + */ public static void setColumnIndexTruncateLength(Configuration conf, int length) { conf.setInt(COLUMN_INDEX_TRUNCATE_LENGTH, length); } - private static int getColumnIndexTruncateLength(Configuration conf) { + /** + * Get the column index truncate length property from a {@link Configuration}. + * + * @param conf the configuration to examine + * @return the value of the column index truncate length property + */ + public static int getColumnIndexTruncateLength(Configuration conf) { return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); } + /** + * Sets the statistics truncate length property in a {@link JobContext}. + * + * @param jobContext the context to update + * @param length the value to set the length to + */ public static void setStatisticsTruncateLength(JobContext jobContext, int length) { setStatisticsTruncateLength(getConfiguration(jobContext), length); } + /** + * Sets the statistics truncate length property in a {@link Configuration}. + * + * @param conf the configuration to update + * @param length the value to set the length to + */ private static void setStatisticsTruncateLength(Configuration conf, int length) { conf.setInt(STATISTICS_TRUNCATE_LENGTH, length); } - private static int getStatisticsTruncateLength(Configuration conf) { + /** + * Get the statistics truncate length property from a {@link Configuration}. + * + * @param conf the configuration to examine + * @return the value of the statistics truncate length property + */ + public static int getStatisticsTruncateLength(Configuration conf) { return conf.getInt(STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); } + /** + * Sets the page row count limit property in a {@link JobContext}. + * + * @param jobContext the context to update + * @param rowCount the value to set the property to + */ public static void setPageRowCountLimit(JobContext jobContext, int rowCount) { setPageRowCountLimit(getConfiguration(jobContext), rowCount); } + /** + * Sets the page row count limit property in a {@link Configuration}. + * + * @param conf the configuration to update + * @param rowCount the value to set the property to + */ public static void setPageRowCountLimit(Configuration conf, int rowCount) { conf.setInt(PAGE_ROW_COUNT_LIMIT, rowCount); } - private static int getPageRowCountLimit(Configuration conf) { + /** + * Get the page row count limit property from a {@link Configuration}. + * + * @param conf the configuration to examine + * @return the value of the page row count limit property + */ + public static int getPageRowCountLimit(Configuration conf) { return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT); } + /** + * Sets the page write checksum enabled property in a {@link JobContext}. + * + * @param jobContext the context to update + * @param val the value to set the property to + */ public static void setPageWriteChecksumEnabled(JobContext jobContext, boolean val) { setPageWriteChecksumEnabled(getConfiguration(jobContext), val); } + /** + * Sets the page write checksum enabled property in a {@link Configuration}. + * + * @param conf the configuration to update + * @param val the value to set the property to + */ public static void setPageWriteChecksumEnabled(Configuration conf, boolean val) { conf.setBoolean(PAGE_WRITE_CHECKSUM_ENABLED, val); } + /** + * Get the page write checksum enabled property from a {@link Configuration}. + * + * @param conf the configuration to examine + * @return the value of the page write checksum enabled property + */ public static boolean getPageWriteChecksumEnabled(Configuration conf) { return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } @@ -424,6 +724,13 @@ public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptConte return getRecordWriter(taskAttemptContext, Mode.CREATE); } + /** + * Get the record writer from a {@link TaskAttemptContext}. + * + * @param taskAttemptContext the task attempt context to examine + * @param mode the mode of the record writer + * @return the record writer from the task attempt context + */ public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext, Mode mode) throws IOException, InterruptedException { @@ -435,21 +742,53 @@ public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptConte return getRecordWriter(conf, file, codec, mode); } + /** + * Get the record writer from a {@link TaskAttemptContext}. + * + * @param taskAttemptContext the task attempt context to examine + * @param file the {@link Path} for the record writer + * @return the record writer from the task attempt context + */ public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext, Path file) throws IOException, InterruptedException { return getRecordWriter(taskAttemptContext, file, Mode.CREATE); } + /** + * Get the record writer from a {@link TaskAttemptContext}. + * + * @param taskAttemptContext the task attempt context to examine + * @param file the {@link Path} for the record writer + * @param mode the mode of the record writer + * @return the record writer from the task attempt context + */ public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext, Path file, Mode mode) throws IOException, InterruptedException { return getRecordWriter(getConfiguration(taskAttemptContext), file, getCodec(taskAttemptContext), mode); } + /** + * Get the record writer from a {@link Configuration}. + * + * @param conf the configuration to examine + * @param file the {@link Path} for the record writer + * @param codec the codec of the record writer + * @return the record writer from the task attempt context + */ public RecordWriter getRecordWriter(Configuration conf, Path file, CompressionCodecName codec) throws IOException, InterruptedException { return getRecordWriter(conf, file, codec, Mode.CREATE); } + /** + * Get the record writer from a {@link Configuration}. + * + * @param conf the configuration to examine + * @param file the {@link Path} for the record writer + * @param codec the codec of the record writer + * @param mode the mode of the record writer + * @return the record writer from the task attempt context + */ public RecordWriter getRecordWriter(Configuration conf, Path file, CompressionCodecName codec, Mode mode) throws IOException, InterruptedException { final WriteSupport writeSupport = getWriteSupport(conf); @@ -517,7 +856,7 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp "be reset by the new value: " + maxLoad); } - return new ParquetRecordWriter ( + return new ParquetRecordWriter<>( w, writeSupport, fileWriteContext.getSchema(), @@ -547,6 +886,12 @@ public WriteSupport getWriteSupport(Configuration configuration){ } } + /** + * Get the {@link OutputCommitter} from a {@link TaskAttemptContext}. + * + * @param context the task attempt context to examine + * @return the output committer from the task attempt context + */ @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { @@ -562,10 +907,23 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) */ private static MemoryManager memoryManager; + /** + * Get the memory manager. + * + * @return the memory manager for all the real writers in one task + */ public synchronized static MemoryManager getMemoryManager() { return memoryManager; } + /** + * Create the {@link FileEncryptionProperties} for a file. + * + * @param fileHadoopConfig the configuration to create the properties for + * @param tempFilePath the path of the file to create the properties for + * @param fileWriteContext the write context of the file to create the properties for + * @return the file's {@link FileEncryptionProperties} + */ public static FileEncryptionProperties createEncryptionProperties(Configuration fileHadoopConfig, Path tempFilePath, WriteContext fileWriteContext) { EncryptionPropertiesFactory cryptoFactory = EncryptionPropertiesFactory.loadFactory(fileHadoopConfig); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 7b78a93763..20a9cc4335 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -20,6 +20,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -133,7 +135,7 @@ public ParquetWriter( /** * Create a new ParquetWriter. - * + * * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads * configuration from the classpath. * @@ -342,7 +344,7 @@ public long getDataSize() { /** * An abstract builder class for ParquetWriter instances. - * + *
* Object models should extend this builder to provide writer configuration * options. * @@ -359,8 +361,9 @@ public abstract static class Builder
> { private long rowGroupSize = DEFAULT_BLOCK_SIZE; private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT; private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED; - private ParquetProperties.Builder encodingPropsBuilder = + private final ParquetProperties.Builder encodingPropsBuilder = ParquetProperties.builder(); + private final Set setConfigurationProps = new HashSet<>(19); protected Builder(Path path) { this.path = path; @@ -389,6 +392,55 @@ protected Builder(OutputFile path) { */ public SELF withConf(Configuration conf) { this.conf = conf; + if (!setConfigurationProps.contains(ParquetOutputFormat.PAGE_SIZE)) + encodingPropsBuilder.withPageSize(ParquetOutputFormat.getPageSize(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT)) + encodingPropsBuilder.withPageRowCountLimit(ParquetOutputFormat.getPageRowCountLimit(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.DICTIONARY_PAGE_SIZE)) + encodingPropsBuilder.withDictionaryPageSize(ParquetOutputFormat.getDictionaryPageSize(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.ENABLE_DICTIONARY)) + encodingPropsBuilder.withDictionaryEncoding(ParquetOutputFormat.getEnableDictionary(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.WRITER_VERSION)) + encodingPropsBuilder.withWriterVersion(ParquetOutputFormat.getWriterVersion(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED)) + encodingPropsBuilder.withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.BLOOM_FILTER_MAX_BYTES)) + encodingPropsBuilder.withMaxBloomFilterBytes(ParquetOutputFormat.getBloomFilterMaxBytes(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.ADAPTIVE_BLOOM_FILTER_ENABLED)) + encodingPropsBuilder.withAdaptiveBloomFilterEnabled(ParquetOutputFormat.getAdaptiveBloomFilterEnabled(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.BLOOM_FILTER_ENABLED)) + encodingPropsBuilder.withBloomFilterEnabled(ParquetOutputFormat.getBloomFilterEnabled(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK)) + encodingPropsBuilder.withMinRowCountForPageSizeCheck(ParquetOutputFormat.getMinRowCountForPageSizeCheck(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK)) + encodingPropsBuilder.withMaxRowCountForPageSizeCheck(ParquetOutputFormat.getMaxRowCountForPageSizeCheck(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.COLUMN_INDEX_TRUNCATE_LENGTH)) + encodingPropsBuilder.withColumnIndexTruncateLength(ParquetOutputFormat.getColumnIndexTruncateLength(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.STATISTICS_TRUNCATE_LENGTH)) + encodingPropsBuilder.withStatisticsTruncateLength(ParquetOutputFormat.getStatisticsTruncateLength(conf)); + if (!setConfigurationProps.contains(ParquetOutputFormat.MAX_PADDING_BYTES)) + maxPaddingSize = ParquetOutputFormat.getMaxPaddingSize(conf); + if (!setConfigurationProps.contains(ParquetOutputFormat.COMPRESSION)) + codecName = ParquetOutputFormat.getCompression(conf); + if (!setConfigurationProps.contains(ParquetOutputFormat.VALIDATION)) + enableValidation = ParquetOutputFormat.getValidation(conf); + ColumnConfigParser cc = new ColumnConfigParser(); + if (!setConfigurationProps.contains(ParquetOutputFormat.ENABLE_DICTIONARY)) + cc.withColumnConfig(ParquetOutputFormat.ENABLE_DICTIONARY, key -> conf.getBoolean(key, ParquetOutputFormat.getEnableDictionary(conf)), encodingPropsBuilder::withDictionaryEncoding); + if (!setConfigurationProps.contains(ParquetOutputFormat.BLOOM_FILTER_ENABLED)) + cc.withColumnConfig(ParquetOutputFormat.BLOOM_FILTER_ENABLED, key -> conf.getBoolean(key, ParquetOutputFormat.getBloomFilterEnabled(conf)), + encodingPropsBuilder::withBloomFilterEnabled); + if (!setConfigurationProps.contains(ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV)) + cc.withColumnConfig(ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, -1L), encodingPropsBuilder::withBloomFilterNDV); + if (!setConfigurationProps.contains(ParquetOutputFormat.BLOOM_FILTER_FPP)) + cc.withColumnConfig(ParquetOutputFormat.BLOOM_FILTER_FPP, key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP), + encodingPropsBuilder::withBloomFilterFPP); + if (!setConfigurationProps.contains(ParquetOutputFormat.BLOOM_FILTER_CANDIDATES_NUMBER)) + cc.withColumnConfig( + ParquetOutputFormat.BLOOM_FILTER_CANDIDATES_NUMBER, + key -> conf.getInt(key, ParquetProperties.DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER), + encodingPropsBuilder::withBloomFilterCandidatesNumber); + cc.parseConfig(conf); return self(); } @@ -412,6 +464,7 @@ public SELF withWriteMode(ParquetFileWriter.Mode mode) { * @return this builder for method chaining. */ public SELF withCompressionCodec(CompressionCodecName codecName) { + setConfigurationProps.add(ParquetOutputFormat.COMPRESSION); this.codecName = codecName; return self(); } @@ -458,6 +511,7 @@ public SELF withRowGroupSize(long rowGroupSize) { * @return this builder for method chaining. */ public SELF withPageSize(int pageSize) { + setConfigurationProps.add(ParquetOutputFormat.PAGE_SIZE); encodingPropsBuilder.withPageSize(pageSize); return self(); } @@ -469,6 +523,7 @@ public SELF withPageSize(int pageSize) { * @return this builder for method chaining */ public SELF withPageRowCountLimit(int rowCount) { + setConfigurationProps.add(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT); encodingPropsBuilder.withPageRowCountLimit(rowCount); return self(); } @@ -481,6 +536,7 @@ public SELF withPageRowCountLimit(int rowCount) { * @return this builder for method chaining. */ public SELF withDictionaryPageSize(int dictionaryPageSize) { + setConfigurationProps.add(ParquetOutputFormat.DICTIONARY_PAGE_SIZE); encodingPropsBuilder.withDictionaryPageSize(dictionaryPageSize); return self(); } @@ -494,6 +550,7 @@ public SELF withDictionaryPageSize(int dictionaryPageSize) { * @return this builder for method chaining. */ public SELF withMaxPaddingSize(int maxPaddingSize) { + setConfigurationProps.add(ParquetOutputFormat.MAX_PADDING_BYTES); this.maxPaddingSize = maxPaddingSize; return self(); } @@ -504,6 +561,7 @@ public SELF withMaxPaddingSize(int maxPaddingSize) { * @return this builder for method chaining. */ public SELF enableDictionaryEncoding() { + setConfigurationProps.add(ParquetOutputFormat.ENABLE_DICTIONARY); encodingPropsBuilder.withDictionaryEncoding(true); return self(); } @@ -515,6 +573,7 @@ public SELF enableDictionaryEncoding() { * @return this builder for method chaining. */ public SELF withDictionaryEncoding(boolean enableDictionary) { + setConfigurationProps.add(ParquetOutputFormat.ENABLE_DICTIONARY); encodingPropsBuilder.withDictionaryEncoding(enableDictionary); return self(); } @@ -532,6 +591,7 @@ public SELF withByteStreamSplitEncoding(boolean enableByteStreamSplit) { * @return this builder for method chaining. */ public SELF withDictionaryEncoding(String columnPath, boolean enableDictionary) { + setConfigurationProps.add(ParquetOutputFormat.ENABLE_DICTIONARY); encodingPropsBuilder.withDictionaryEncoding(columnPath, enableDictionary); return self(); } @@ -542,6 +602,7 @@ public SELF withDictionaryEncoding(String columnPath, boolean enableDictionary) * @return this builder for method chaining. */ public SELF enableValidation() { + setConfigurationProps.add(ParquetOutputFormat.VALIDATION); this.enableValidation = true; return self(); } @@ -553,6 +614,7 @@ public SELF enableValidation() { * @return this builder for method chaining. */ public SELF withValidation(boolean enableValidation) { + setConfigurationProps.add(ParquetOutputFormat.VALIDATION); this.enableValidation = enableValidation; return self(); } @@ -565,6 +627,7 @@ public SELF withValidation(boolean enableValidation) { * @return this builder for method chaining. */ public SELF withWriterVersion(WriterVersion version) { + setConfigurationProps.add(ParquetOutputFormat.WRITER_VERSION); encodingPropsBuilder.withWriterVersion(version); return self(); } @@ -575,6 +638,7 @@ public SELF withWriterVersion(WriterVersion version) { * @return this builder for method chaining. */ public SELF enablePageWriteChecksum() { + setConfigurationProps.add(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED); encodingPropsBuilder.withPageWriteChecksumEnabled(true); return self(); } @@ -586,6 +650,7 @@ public SELF enablePageWriteChecksum() { * @return this builder for method chaining. */ public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) { + setConfigurationProps.add(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED); encodingPropsBuilder.withPageWriteChecksumEnabled(enablePageWriteChecksum); return self(); } @@ -597,6 +662,7 @@ public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) { * @return this builder for method chaining */ public SELF withMaxBloomFilterBytes(int maxBloomFilterBytes) { + setConfigurationProps.add(ParquetOutputFormat.BLOOM_FILTER_MAX_BYTES); encodingPropsBuilder.withMaxBloomFilterBytes(maxBloomFilterBytes); return self(); } @@ -610,12 +676,14 @@ public SELF withMaxBloomFilterBytes(int maxBloomFilterBytes) { * @return this builder for method chaining. */ public SELF withBloomFilterNDV(String columnPath, long ndv) { + setConfigurationProps.add(ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV); encodingPropsBuilder.withBloomFilterNDV(columnPath, ndv); return self(); } public SELF withBloomFilterFPP(String columnPath, double fpp) { + setConfigurationProps.add(ParquetOutputFormat.BLOOM_FILTER_FPP); encodingPropsBuilder.withBloomFilterFPP(columnPath, fpp); return self(); } @@ -627,6 +695,7 @@ public SELF withBloomFilterFPP(String columnPath, double fpp) { * @param enabled whether to write bloom filter for the column */ public SELF withAdaptiveBloomFilterEnabled(boolean enabled) { + setConfigurationProps.add(ParquetOutputFormat.ADAPTIVE_BLOOM_FILTER_ENABLED); encodingPropsBuilder.withAdaptiveBloomFilterEnabled(enabled); return self(); } @@ -638,6 +707,7 @@ public SELF withAdaptiveBloomFilterEnabled(boolean enabled) { * @param number the number of candidate */ public SELF withBloomFilterCandidateNumber(String columnPath, int number) { + setConfigurationProps.add(ParquetOutputFormat.BLOOM_FILTER_CANDIDATES_NUMBER); encodingPropsBuilder.withBloomFilterCandidatesNumber(columnPath, number); return self(); } @@ -649,6 +719,7 @@ public SELF withBloomFilterCandidateNumber(String columnPath, int number) { * @return this builder for method chaining */ public SELF withBloomFilterEnabled(boolean enabled) { + setConfigurationProps.add(ParquetOutputFormat.BLOOM_FILTER_ENABLED); encodingPropsBuilder.withBloomFilterEnabled(enabled); return self(); } @@ -662,6 +733,7 @@ public SELF withBloomFilterEnabled(boolean enabled) { * @return this builder for method chaining */ public SELF withBloomFilterEnabled(String columnPath, boolean enabled) { + setConfigurationProps.add(ParquetOutputFormat.BLOOM_FILTER_ENABLED); encodingPropsBuilder.withBloomFilterEnabled(columnPath, enabled); return self(); } @@ -673,6 +745,7 @@ public SELF withBloomFilterEnabled(String columnPath, boolean enabled) { * @return this builder for method chaining */ public SELF withMinRowCountForPageSizeCheck(int min) { + setConfigurationProps.add(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK); encodingPropsBuilder.withMinRowCountForPageSizeCheck(min); return self(); } @@ -684,6 +757,7 @@ public SELF withMinRowCountForPageSizeCheck(int min) { * @return this builder for method chaining */ public SELF withMaxRowCountForPageSizeCheck(int max) { + setConfigurationProps.add(ParquetOutputFormat.MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK); encodingPropsBuilder.withMaxRowCountForPageSizeCheck(max); return self(); } @@ -695,6 +769,7 @@ public SELF withMaxRowCountForPageSizeCheck(int max) { * @return this builder for method chaining */ public SELF withColumnIndexTruncateLength(int length) { + setConfigurationProps.add(ParquetOutputFormat.COLUMN_INDEX_TRUNCATE_LENGTH); encodingPropsBuilder.withColumnIndexTruncateLength(length); return self(); } @@ -706,6 +781,7 @@ public SELF withColumnIndexTruncateLength(int length) { * @return this builder for method chaining */ public SELF withStatisticsTruncateLength(int length) { + setConfigurationProps.add(ParquetOutputFormat.STATISTICS_TRUNCATE_LENGTH); encodingPropsBuilder.withStatisticsTruncateLength(length); return self(); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index 4fa933e754..85b3ed2dc1 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -44,6 +44,7 @@ import org.apache.parquet.filter2.predicate.Statistics; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupWriteSupport; @@ -86,7 +87,9 @@ public class DictionaryFilterTest { private static final int nElements = 1000; private static final Configuration conf = new Configuration(); private static final Path FILE_V1 = new Path("target/test/TestDictionaryFilter/testParquetFileV1.parquet"); + private static final Path FILE_V1_CONF = new Path("target/test/TestDictionaryFilter/testParquetFileV1Conf.parquet"); private static final Path FILE_V2 = new Path("target/test/TestDictionaryFilter/testParquetFileV2.parquet"); + private static final Path FILE_V2_CONF = new Path("target/test/TestDictionaryFilter/testParquetFileV2Conf.parquet"); private static final MessageType schema = parseMessageType( "message test { " + "required binary binary_field; " @@ -177,7 +180,9 @@ private static void writeData(SimpleGroupFactory f, ParquetWriter writer) public static void prepareFile() throws IOException { cleanup(); prepareFile(PARQUET_1_0, FILE_V1); + prepareFileWithConf(PARQUET_1_0, FILE_V1_CONF); prepareFile(PARQUET_2_0, FILE_V2); + prepareFileWithConf(PARQUET_2_0, FILE_V2_CONF); } private static void prepareFile(WriterVersion version, Path file) throws IOException { @@ -195,10 +200,28 @@ private static void prepareFile(WriterVersion version, Path file) throws IOExcep writeData(f, writer); } + private static void prepareFileWithConf(WriterVersion version, Path file) throws IOException { + Configuration configuration = new Configuration(); + configuration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true); + configuration.setInt(ParquetOutputFormat.DICTIONARY_PAGE_SIZE, 2 * 1024); + configuration.setInt(ParquetOutputFormat.PAGE_SIZE, 1024); + configuration.set(ParquetOutputFormat.COMPRESSION, GZIP.name()); + configuration.set(ParquetOutputFormat.WRITER_VERSION, version.name()); + GroupWriteSupport.setSchema(schema, configuration); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + ParquetWriter writer = ExampleParquetWriter.builder(file) + .withRowGroupSize(1024*1024) + .withConf(configuration) + .build(); + writeData(f, writer); + } + @AfterClass public static void cleanup() throws IOException { deleteFile(FILE_V1); + deleteFile(FILE_V1_CONF); deleteFile(FILE_V2); + deleteFile(FILE_V2_CONF); } private static void deleteFile(Path file) throws IOException { @@ -209,24 +232,24 @@ private static void deleteFile(Path file) throws IOException { } @Parameters - public static Object[] params() { - return new Object[] {PARQUET_1_0, PARQUET_2_0}; + public static Object[][] params() { + return new Object[][] {{PARQUET_1_0, true}, {PARQUET_2_0, true}, {PARQUET_1_0, false}, {PARQUET_2_0, false}}; } List ccmd; ParquetFileReader reader; DictionaryPageReadStore dictionaries; private Path file; - private WriterVersion version; + private final WriterVersion version; - public DictionaryFilterTest(WriterVersion version) { + public DictionaryFilterTest(WriterVersion version, boolean conf) { this.version = version; switch (version) { case PARQUET_1_0: - file = FILE_V1; + file = conf ? FILE_V1_CONF : FILE_V1; break; case PARQUET_2_0: - file = FILE_V2; + file = conf ? FILE_V2_CONF : FILE_V2; break; } }