From 009a63d45bc3555ef70654003cc3b959c21cb3c6 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sun, 10 Nov 2024 23:17:14 +0800 Subject: [PATCH 1/2] GH-3055: Disable column statistics for all columns by configuration --- .../parquet/column/ParquetProperties.java | 16 +++++- parquet-hadoop/README.md | 7 +++ .../parquet/hadoop/ParquetOutputFormat.java | 12 ++++- .../apache/parquet/hadoop/ParquetWriter.java | 12 +++++ .../statistics/DataGenerationContext.java | 54 +++++++++++++++---- .../parquet/statistics/TestStatistics.java | 49 ++++++++++++++++- 6 files changed, 137 insertions(+), 13 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index a9f80406a1..1fd7ed9bc8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -111,6 +111,7 @@ public static WriterVersion fromString(String name) { private final ValuesWriterFactory valuesWriterFactory; private final int columnIndexTruncateLength; private final int statisticsTruncateLength; + private final boolean statisticsEnabled; // The expected NDV (number of distinct values) for each columns private final ColumnProperty bloomFilterNDVs; @@ -141,6 +142,7 @@ private ParquetProperties(Builder builder) { this.valuesWriterFactory = builder.valuesWriterFactory; this.columnIndexTruncateLength = builder.columnIndexTruncateLength; this.statisticsTruncateLength = builder.statisticsTruncateLength; + this.statisticsEnabled = builder.statisticsEnabled; this.bloomFilterNDVs = builder.bloomFilterNDVs.build(); this.bloomFilterFPPs = builder.bloomFilterFPPs.build(); this.bloomFilterEnabled = builder.bloomFilterEnabled.build(); @@ -334,7 +336,13 @@ public Map getExtraMetaData() { } public boolean getStatisticsEnabled(ColumnDescriptor column) { - return statistics.getValue(column); + // First check column-specific setting + Boolean columnSetting = statistics.getValue(column); + if (columnSetting != null) { + return columnSetting; + } + // Fall back to global setting + return statisticsEnabled; } @Override @@ -369,6 +377,7 @@ public static class Builder { private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH; + private boolean statisticsEnabled = DEFAULT_STATISTICS_ENABLED; private final ColumnProperty.Builder bloomFilterNDVs; private final ColumnProperty.Builder bloomFilterFPPs; private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES; @@ -679,6 +688,11 @@ public Builder withStatisticsEnabled(String columnPath, boolean enabled) { return this; } + public Builder withStatisticsEnabled(boolean enabled) { + this.statisticsEnabled = enabled; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md index 47a89a047c..c00fea2439 100644 --- a/parquet-hadoop/README.md +++ b/parquet-hadoop/README.md @@ -509,3 +509,10 @@ If `true` then an attempt will be made to dynamically load the relevant classes; if not found then the library will use the classic non-vectored reads: it is safe to enable this option on older releases. **Default value:** `false` +--- + +**Property:** `parquet.column.statistics.enabled` +**Description:** Whether to enable column statistics collection. +If `true`, statistics will be collected for all columns unless explicitly disabled for specific columns. +If `false`, statistics will be disabled for all columns regardless of column-specific settings. +**Default value:** `true` diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 801da05018..b16d743a98 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -156,6 +156,7 @@ public static enum JobSummaryLevel { public static final String BLOOM_FILTER_CANDIDATES_NUMBER = "parquet.bloom.filter.candidates.number"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; + public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -388,6 +389,14 @@ public static boolean getPageWriteChecksumEnabled(Configuration conf) { return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } + public static void setStatisticsEnabled(JobContext jobContext, boolean enabled) { + getConfiguration(jobContext).setBoolean(STATISTICS_ENABLED, enabled); + } + + public static boolean getStatisticsEnabled(Configuration conf) { + return conf.getBoolean(STATISTICS_ENABLED, ParquetProperties.DEFAULT_STATISTICS_ENABLED); + } + private WriteSupport writeSupport; private ParquetOutputCommitter committer; @@ -463,7 +472,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withBloomFilterEnabled(getBloomFilterEnabled(conf)) .withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf)) .withPageRowCountLimit(getPageRowCountLimit(conf)) - .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)); + .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)) + .withStatisticsEnabled(getStatisticsEnabled(conf)); new ColumnConfigParser() .withColumnConfig( ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 4fa6d96be4..f3c50c16c3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -906,6 +906,18 @@ public SELF withStatisticsEnabled(String columnPath, boolean enabled) { return self(); } + /** + * Sets whether statistics are enabled globally. When disabled, statistics will not be collected + * for any column unless explicitly enabled for specific columns. + * + * @param enabled whether to collect statistics globally + * @return this builder for method chaining + */ + public SELF withStatisticsEnabled(boolean enabled) { + encodingPropsBuilder.withStatisticsEnabled(enabled); + return self(); + } + /** * Build a {@link ParquetWriter} with the accumulated configuration. * diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java index 4e6b2a1b60..b76a8bfd0f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java @@ -42,6 +42,30 @@ public abstract static class WriteContext { protected final boolean enableValidation; protected final ParquetProperties.WriterVersion version; protected final Set disableColumnStatistics; + protected final boolean disableAllStatistics; + + public WriteContext( + File path, + MessageType schema, + int blockSize, + int pageSize, + boolean enableDictionary, + boolean enableValidation, + ParquetProperties.WriterVersion version, + Set disableColumnStatistics, + boolean disableAllStatistics) + throws IOException { + this.path = path; + this.fsPath = new Path(path.toString()); + this.schema = schema; + this.blockSize = blockSize; + this.pageSize = pageSize; + this.enableDictionary = enableDictionary; + this.enableValidation = enableValidation; + this.version = version; + this.disableColumnStatistics = disableColumnStatistics; + this.disableAllStatistics = disableAllStatistics; + } public WriteContext( File path, @@ -52,7 +76,16 @@ public WriteContext( boolean enableValidation, ParquetProperties.WriterVersion version) throws IOException { - this(path, schema, blockSize, pageSize, enableDictionary, enableValidation, version, ImmutableSet.of()); + this( + path, + schema, + blockSize, + pageSize, + enableDictionary, + enableValidation, + version, + ImmutableSet.of(), + false); } public WriteContext( @@ -65,15 +98,16 @@ public WriteContext( ParquetProperties.WriterVersion version, Set disableColumnStatistics) throws IOException { - this.path = path; - this.fsPath = new Path(path.toString()); - this.schema = schema; - this.blockSize = blockSize; - this.pageSize = pageSize; - this.enableDictionary = enableDictionary; - this.enableValidation = enableValidation; - this.version = version; - this.disableColumnStatistics = disableColumnStatistics; + this( + path, + schema, + blockSize, + pageSize, + enableDictionary, + enableValidation, + version, + disableColumnStatistics, + false); } public abstract void write(ParquetWriter writer) throws IOException; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java index c3aecb6585..abca17edea 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java @@ -317,7 +317,29 @@ public DataContext( ParquetProperties.WriterVersion version, Set disableColumnStatistics) throws IOException { - super(path, buildSchema(seed), blockSize, pageSize, enableDictionary, true, version); + this(seed, path, blockSize, pageSize, enableDictionary, version, disableColumnStatistics, false); + } + + public DataContext( + long seed, + File path, + int blockSize, + int pageSize, + boolean enableDictionary, + ParquetProperties.WriterVersion version, + Set disableColumnStatistics, + boolean disableAllStatistics) + throws IOException { + super( + path, + buildSchema(seed), + blockSize, + pageSize, + enableDictionary, + true, + version, + disableColumnStatistics, + disableAllStatistics); this.random = new Random(seed); this.recordCount = random.nextInt(MAX_TOTAL_ROWS); @@ -643,4 +665,29 @@ public void testDisableStatistics() throws IOException { DataGenerationContext.writeAndTest(test); } } + + @Test + public void testGlobalStatisticsDisabled() throws IOException { + File file = folder.newFile("test_file_global_stats_disabled.parquet"); + file.delete(); + + LOG.info(String.format("RANDOM SEED: %s", RANDOM_SEED)); + Random random = new Random(RANDOM_SEED); + + int blockSize = (random.nextInt(54) + 10) * MEGABYTE; + int pageSize = (random.nextInt(10) + 1) * MEGABYTE; + + // Create context with global statistics disabled + DataContext context = new DataContext( + random.nextLong(), + file, + blockSize, + pageSize, + true, // enable dictionary + ParquetProperties.WriterVersion.PARQUET_2_0, + ImmutableSet.of(), // no specific column statistics disabled + true); // disable all statistics globally + + DataGenerationContext.writeAndTest(context); + } } From b9c22c1d3436a0b7d411756496da5b02555654af Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 13 Nov 2024 10:57:10 +0800 Subject: [PATCH 2/2] add per column config --- parquet-hadoop/README.md | 9 +++++++++ .../parquet/hadoop/ParquetOutputFormat.java | 16 ++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md index c00fea2439..5817069f4b 100644 --- a/parquet-hadoop/README.md +++ b/parquet-hadoop/README.md @@ -515,4 +515,13 @@ if not found then the library will use the classic non-vectored reads: it is saf **Description:** Whether to enable column statistics collection. If `true`, statistics will be collected for all columns unless explicitly disabled for specific columns. If `false`, statistics will be disabled for all columns regardless of column-specific settings. +It is possible to enable or disable statistics for specific columns by appending `#` followed by the column path. **Default value:** `true` +**Example:** +```java +// Enable statistics for all columns +conf.set("parquet.column.statistics.enabled", true); +// Disable statistics for 'column.path' +conf.set("parquet.column.statistics.enabled#column.path", false); +// The final configuration will be: Enable statistics for all columns except 'column.path' +``` diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index b16d743a98..c13781a685 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -397,6 +397,18 @@ public static boolean getStatisticsEnabled(Configuration conf) { return conf.getBoolean(STATISTICS_ENABLED, ParquetProperties.DEFAULT_STATISTICS_ENABLED); } + public static void setStatisticsEnabled(JobContext jobContext, String columnPath, boolean enabled) { + getConfiguration(jobContext).set(STATISTICS_ENABLED + "#" + columnPath, String.valueOf(enabled)); + } + + public static boolean getStatisticsEnabled(Configuration conf, String columnPath) { + String columnSpecific = conf.get(STATISTICS_ENABLED + "#" + columnPath); + if (columnSpecific != null) { + return Boolean.parseBoolean(columnSpecific); + } + return conf.getBoolean(STATISTICS_ENABLED, ParquetProperties.DEFAULT_STATISTICS_ENABLED); + } + private WriteSupport writeSupport; private ParquetOutputCommitter committer; @@ -489,6 +501,10 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp BLOOM_FILTER_CANDIDATES_NUMBER, key -> conf.getInt(key, ParquetProperties.DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER), propsBuilder::withBloomFilterCandidatesNumber) + .withColumnConfig( + STATISTICS_ENABLED, + key -> conf.getBoolean(key, ParquetProperties.DEFAULT_STATISTICS_ENABLED), + propsBuilder::withStatisticsEnabled) .parseConfig(conf); ParquetProperties props = propsBuilder.build();