diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 41242359c6..e6c9d8e70a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -157,8 +157,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException { this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns(); this.renamedColumns = options.gerRenameColumns(); ParquetConfiguration conf = options.getParquetConfiguration(); - inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); - inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf)); + this.inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); + this.inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf)); this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); this.extraMetaData = getExtraMetadata(options); ensureSameSchema(inputFiles); @@ -186,7 +186,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { } ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; - writer = new ParquetFileWriter( + this.writer = new ParquetFileWriter( out, renamedColumns.isEmpty() ? outSchema : getSchemaWithRenamedColumns(this.outSchema), writerMode, @@ -202,7 +202,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException { this.nullColumnEncryptor = null; } else { this.nullColumnEncryptor = new InternalFileEncryptor(options.getFileEncryptionProperties()); - List columns = outSchema.getColumns(); + List columns = + getSchemaWithRenamedColumns(this.outSchema).getColumns(); for (int i = 0; i < columns.size(); i++) { writer.getEncryptor() .getColumnSetup(ColumnPath.get(columns.get(i).getPath()), true, i); @@ -225,8 +226,8 @@ public ParquetRewriter( this.writer = writer; this.outSchema = outSchema; this.newCodecName = codecName; - extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); - extraMetaData.put( + this.extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); + this.extraMetaData.put( ORIGINAL_CREATED_BY_KEY, originalCreatedBy != null ? originalCreatedBy @@ -492,9 +493,9 @@ private void processBlock( throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted"); } - ColumnChunkMetaData chunkColumnsNormalized = chunk; + ColumnChunkMetaData chunkNormalized = chunk; if (!renamedColumns.isEmpty()) { - chunkColumnsNormalized = + chunkNormalized = chunk.copy(normalizeFieldsInPath(chunk.getPath()), normalizeNameInType(chunk.getPrimitiveType())); } @@ -558,12 +559,7 @@ private void processBlock( ColumnIndex columnIndex = indexCache.getColumnIndex(chunk); OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk); writer.appendColumnChunk( - descriptorRenamed, - reader.getStream(), - chunkColumnsNormalized, - bloomFilter, - columnIndex, - offsetIndex); + descriptorRenamed, reader.getStream(), chunkNormalized, bloomFilter, columnIndex, offsetIndex); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index 2a39b0eaeb..0dc76b8a2a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -23,7 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import org.apache.curator.shaded.com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; @@ -582,11 +584,11 @@ public RewriteOptions build() { pruneColumns, newCodecName, maskColumns, - (renameColumns == null + renameColumns == null ? new HashMap<>() : renameColumns.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, x -> x.getValue().trim()))), + .collect(Collectors.toMap(x -> x.getKey().trim(), x -> x.getValue() + .trim())), encryptColumns, fileEncryptionProperties, indexCacheStrategy, @@ -613,21 +615,24 @@ private void checkPreconditions() { } } - if (renameColumns != null && !renameColumns.isEmpty()) { - if (encryptColumns != null && !encryptColumns.isEmpty()) { - for (Map.Entry entry : renameColumns.entrySet()) { - Preconditions.checkArgument( - !encryptColumns.contains(entry.getKey()), "Cannot prune and rename same column"); - } - } - for (Map.Entry entry : renameColumns.entrySet()) { + if (renameColumns != null) { + Set nullifiedColumns = maskColumns == null + ? ImmutableSet.of() + : maskColumns.entrySet().stream() + .filter(x -> x.getValue() == MaskMode.NULLIFY) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + renameColumns.forEach((colSrc, colDst) -> { Preconditions.checkArgument( - entry.getValue() != null && !entry.getValue().trim().isEmpty(), - "Renamed column target name can't be empty"); + colSrc != null && !colSrc.trim().isEmpty(), "Renamed column source name can't be empty"); Preconditions.checkArgument( - !entry.getKey().contains(".") && !entry.getValue().contains("."), - "Renamed column name can't be nested, in case of GroupType column only a top level column can be renamed"); - } + colDst != null && !colDst.trim().isEmpty(), "Renamed column target name can't be empty"); + Preconditions.checkArgument( + !nullifiedColumns.contains(colSrc), "Cannot nullify and rename the same column"); + Preconditions.checkArgument( + !colSrc.contains(".") && !colDst.contains("."), + "Renamed column can't be nested, in case of GroupType column only a top level column can be renamed"); + }); } if (encryptColumns != null && !encryptColumns.isEmpty()) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index dca7d42762..c1da97c403 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -590,18 +590,26 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception { Map renameColumns = ImmutableMap.of("Name", "NameRenamed"); List pruneColumns = ImmutableList.of("Gender"); + String[] encryptColumns = {"DocId"}; + FileEncryptionProperties fileEncryptionProperties = + EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false); List inputPaths = inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()); RewriteOptions.Builder builder = createBuilder(inputPaths); RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy) .renameColumns(ImmutableMap.of("Name", "NameRenamed")) .prune(pruneColumns) + .transform(CompressionCodecName.SNAPPY) + .encrypt(Arrays.asList(encryptColumns)) + .encryptionProperties(fileEncryptionProperties) .build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); rewriter.close(); + FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties(); + // Verify the schema is not changed ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); @@ -609,39 +617,59 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception { MessageType expectSchema = createSchemaWithRenamed(); assertEquals(expectSchema, schema); - // Verify codec has not been translated - verifyCodec( - outputFile, - new HashSet() { - { - add(CompressionCodecName.GZIP); - add(CompressionCodecName.UNCOMPRESSED); - } - }, - null); - + verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.SNAPPY), fileDecryptionProperties); // Verify codec // Verify the merged data are not changed - validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false, renameColumns); - - // Verify the page index - validatePageIndex(new HashSet<>(), false, renameColumns); - - // Verify original.created.by is preserved - validateCreatedBy(); + validateColumnData( + new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false, renameColumns); + validatePageIndex(ImmutableSet.of("DocId"), false, renameColumns); // Verify the page index + validateCreatedBy(); // Verify original.created.by is preserved validateRowGroupRowCount(); + + ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); + assertFalse(metaData.getBlocks().isEmpty()); + Set encryptedColumns = new HashSet<>(Arrays.asList(encryptColumns)); + for (BlockMetaData blockMetaData : metaData.getBlocks()) { + List columns = blockMetaData.getColumns(); + for (ColumnChunkMetaData column : columns) { + if (encryptedColumns.contains(column.getPath().toDotString())) { + assertTrue(column.isEncrypted()); + } else { + assertFalse(column.isEncrypted()); + } + } + } } @Test(expected = InvalidSchemaException.class) public void testMergeTwoFilesWithDifferentSchema() throws Exception { - testMergeTwoFilesWithDifferentSchemaSetup(true); + testMergeTwoFilesWithDifferentSchemaSetup(true, null, null); } @Test(expected = InvalidSchemaException.class) public void testMergeTwoFilesToJoinWithDifferentSchema() throws Exception { - testMergeTwoFilesWithDifferentSchemaSetup(false); + testMergeTwoFilesWithDifferentSchemaSetup(false, null, null); } - public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInputFile) throws Exception { + @Test(expected = IllegalArgumentException.class) + public void testMergeTwoFilesWithWrongDestinationRenamedColumn() throws Exception { + testMergeTwoFilesWithDifferentSchemaSetup( + null, ImmutableMap.of("WrongColumnName", "WrongColumnNameRenamed"), null); + } + + @Test(expected = IllegalArgumentException.class) + public void testMergeTwoFilesWithWrongSourceRenamedColumn() throws Exception { + testMergeTwoFilesWithDifferentSchemaSetup(null, ImmutableMap.of("Name", "DocId"), null); + } + + @Test(expected = IllegalArgumentException.class) + public void testMergeTwoFilesNullifyAndRenamedSameColumn() throws Exception { + testMergeTwoFilesWithDifferentSchemaSetup( + null, ImmutableMap.of("Name", "NameRenamed"), ImmutableMap.of("Name", MaskMode.NULLIFY)); + } + + public void testMergeTwoFilesWithDifferentSchemaSetup( + Boolean wrongSchemaInInputFile, Map renameColumns, Map maskColumns) + throws Exception { MessageType schema1 = new MessageType( "schema", new PrimitiveType(OPTIONAL, INT64, "DocId"), @@ -670,27 +698,32 @@ public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInput .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) .withWriterVersion(writerVersion) .build()); - if (wrongSchemaInInputFile) { - inputFiles.add(new TestFileBuilder(conf, schema2) - .withNumRecord(numRecord) - .withCodec("UNCOMPRESSED") - .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) - .withWriterVersion(writerVersion) - .build()); - } else { - inputFilesToJoin.add(new TestFileBuilder(conf, schema2) - .withNumRecord(numRecord) - .withCodec("UNCOMPRESSED") - .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) - .withWriterVersion(writerVersion) - .build()); + if (wrongSchemaInInputFile != null) { + if (wrongSchemaInInputFile) { + inputFiles.add(new TestFileBuilder(conf, schema2) + .withNumRecord(numRecord) + .withCodec("UNCOMPRESSED") + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .withWriterVersion(writerVersion) + .build()); + } else { + inputFilesToJoin.add(new TestFileBuilder(conf, schema2) + .withNumRecord(numRecord) + .withCodec("UNCOMPRESSED") + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .withWriterVersion(writerVersion) + .build()); + } } RewriteOptions.Builder builder = createBuilder( inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()), inputFilesToJoin.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()), false); - RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy).build(); + RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy) + .renameColumns(renameColumns) + .mask(maskColumns) + .build(); // This should throw an exception because the schemas are different rewriter = new ParquetRewriter(options);