Skip to content

Commit

Permalink
[apacheGH-3035][ParquetRewriter] extend tests for renaming feature
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim_konstantinov committed Nov 6, 2024
1 parent 4f1f81e commit d78132d
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns();
this.renamedColumns = options.gerRenameColumns();
ParquetConfiguration conf = options.getParquetConfiguration();
inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf));
this.inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
this.inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf));
this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns());
this.extraMetaData = getExtraMetadata(options);
ensureSameSchema(inputFiles);
Expand Down Expand Up @@ -186,7 +186,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
}

ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
writer = new ParquetFileWriter(
this.writer = new ParquetFileWriter(
out,
renamedColumns.isEmpty() ? outSchema : getSchemaWithRenamedColumns(this.outSchema),
writerMode,
Expand All @@ -202,7 +202,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
this.nullColumnEncryptor = null;
} else {
this.nullColumnEncryptor = new InternalFileEncryptor(options.getFileEncryptionProperties());
List<ColumnDescriptor> columns = outSchema.getColumns();
List<ColumnDescriptor> columns =
getSchemaWithRenamedColumns(this.outSchema).getColumns();
for (int i = 0; i < columns.size(); i++) {
writer.getEncryptor()
.getColumnSetup(ColumnPath.get(columns.get(i).getPath()), true, i);
Expand All @@ -225,8 +226,8 @@ public ParquetRewriter(
this.writer = writer;
this.outSchema = outSchema;
this.newCodecName = codecName;
extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
extraMetaData.put(
this.extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
this.extraMetaData.put(
ORIGINAL_CREATED_BY_KEY,
originalCreatedBy != null
? originalCreatedBy
Expand Down Expand Up @@ -492,9 +493,9 @@ private void processBlock(
throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
}

ColumnChunkMetaData chunkColumnsNormalized = chunk;
ColumnChunkMetaData chunkNormalized = chunk;
if (!renamedColumns.isEmpty()) {
chunkColumnsNormalized =
chunkNormalized =
chunk.copy(normalizeFieldsInPath(chunk.getPath()), normalizeNameInType(chunk.getPrimitiveType()));
}

Expand Down Expand Up @@ -558,12 +559,7 @@ private void processBlock(
ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
writer.appendColumnChunk(
descriptorRenamed,
reader.getStream(),
chunkColumnsNormalized,
bloomFilter,
columnIndex,
offsetIndex);
descriptorRenamed, reader.getStream(), chunkNormalized, bloomFilter, columnIndex, offsetIndex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.curator.shaded.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.Preconditions;
Expand Down Expand Up @@ -582,11 +584,11 @@ public RewriteOptions build() {
pruneColumns,
newCodecName,
maskColumns,
(renameColumns == null
renameColumns == null
? new HashMap<>()
: renameColumns.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey, x -> x.getValue().trim()))),
.collect(Collectors.toMap(x -> x.getKey().trim(), x -> x.getValue()
.trim())),
encryptColumns,
fileEncryptionProperties,
indexCacheStrategy,
Expand All @@ -613,21 +615,24 @@ private void checkPreconditions() {
}
}

if (renameColumns != null && !renameColumns.isEmpty()) {
if (encryptColumns != null && !encryptColumns.isEmpty()) {
for (Map.Entry<String, String> entry : renameColumns.entrySet()) {
Preconditions.checkArgument(
!encryptColumns.contains(entry.getKey()), "Cannot prune and rename same column");
}
}
for (Map.Entry<String, String> entry : renameColumns.entrySet()) {
if (renameColumns != null) {
Set<String> nullifiedColumns = maskColumns == null
? ImmutableSet.of()
: maskColumns.entrySet().stream()
.filter(x -> x.getValue() == MaskMode.NULLIFY)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
renameColumns.forEach((colSrc, colDst) -> {
Preconditions.checkArgument(
entry.getValue() != null && !entry.getValue().trim().isEmpty(),
"Renamed column target name can't be empty");
colSrc != null && !colSrc.trim().isEmpty(), "Renamed column source name can't be empty");
Preconditions.checkArgument(
!entry.getKey().contains(".") && !entry.getValue().contains("."),
"Renamed column name can't be nested, in case of GroupType column only a top level column can be renamed");
}
colDst != null && !colDst.trim().isEmpty(), "Renamed column target name can't be empty");
Preconditions.checkArgument(
!nullifiedColumns.contains(colSrc), "Cannot nullify and rename the same column");
Preconditions.checkArgument(
!colSrc.contains(".") && !colDst.contains("."),
"Renamed column can't be nested, in case of GroupType column only a top level column can be renamed");
});
}

if (encryptColumns != null && !encryptColumns.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,58 +590,86 @@ public void testMergeTwoFilesOnlyRenameColumn() throws Exception {

Map<String, String> renameColumns = ImmutableMap.of("Name", "NameRenamed");
List<String> pruneColumns = ImmutableList.of("Gender");
String[] encryptColumns = {"DocId"};
FileEncryptionProperties fileEncryptionProperties =
EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false);
List<Path> inputPaths =
inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList());
RewriteOptions.Builder builder = createBuilder(inputPaths);
RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy)
.renameColumns(ImmutableMap.of("Name", "NameRenamed"))
.prune(pruneColumns)
.transform(CompressionCodecName.SNAPPY)
.encrypt(Arrays.asList(encryptColumns))
.encryptionProperties(fileEncryptionProperties)
.build();

rewriter = new ParquetRewriter(options);
rewriter.processBlocks();
rewriter.close();

FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties();

// Verify the schema is not changed
ParquetMetadata pmd =
ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
MessageType expectSchema = createSchemaWithRenamed();
assertEquals(expectSchema, schema);

// Verify codec has not been translated
verifyCodec(
outputFile,
new HashSet<CompressionCodecName>() {
{
add(CompressionCodecName.GZIP);
add(CompressionCodecName.UNCOMPRESSED);
}
},
null);

verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.SNAPPY), fileDecryptionProperties); // Verify codec
// Verify the merged data are not changed
validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false, renameColumns);

// Verify the page index
validatePageIndex(new HashSet<>(), false, renameColumns);

// Verify original.created.by is preserved
validateCreatedBy();
validateColumnData(
new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false, renameColumns);
validatePageIndex(ImmutableSet.of("DocId"), false, renameColumns); // Verify the page index
validateCreatedBy(); // Verify original.created.by is preserved
validateRowGroupRowCount();

ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties);
assertFalse(metaData.getBlocks().isEmpty());
Set<String> encryptedColumns = new HashSet<>(Arrays.asList(encryptColumns));
for (BlockMetaData blockMetaData : metaData.getBlocks()) {
List<ColumnChunkMetaData> columns = blockMetaData.getColumns();
for (ColumnChunkMetaData column : columns) {
if (encryptedColumns.contains(column.getPath().toDotString())) {
assertTrue(column.isEncrypted());
} else {
assertFalse(column.isEncrypted());
}
}
}
}

@Test(expected = InvalidSchemaException.class)
public void testMergeTwoFilesWithDifferentSchema() throws Exception {
testMergeTwoFilesWithDifferentSchemaSetup(true);
testMergeTwoFilesWithDifferentSchemaSetup(true, null, null);
}

@Test(expected = InvalidSchemaException.class)
public void testMergeTwoFilesToJoinWithDifferentSchema() throws Exception {
testMergeTwoFilesWithDifferentSchemaSetup(false);
testMergeTwoFilesWithDifferentSchemaSetup(false, null, null);
}

public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInputFile) throws Exception {
@Test(expected = IllegalArgumentException.class)
public void testMergeTwoFilesWithWrongDestinationRenamedColumn() throws Exception {
testMergeTwoFilesWithDifferentSchemaSetup(
null, ImmutableMap.of("WrongColumnName", "WrongColumnNameRenamed"), null);
}

@Test(expected = IllegalArgumentException.class)
public void testMergeTwoFilesWithWrongSourceRenamedColumn() throws Exception {
testMergeTwoFilesWithDifferentSchemaSetup(null, ImmutableMap.of("Name", "DocId"), null);
}

@Test(expected = IllegalArgumentException.class)
public void testMergeTwoFilesNullifyAndRenamedSameColumn() throws Exception {
testMergeTwoFilesWithDifferentSchemaSetup(
null, ImmutableMap.of("Name", "NameRenamed"), ImmutableMap.of("Name", MaskMode.NULLIFY));
}

public void testMergeTwoFilesWithDifferentSchemaSetup(
Boolean wrongSchemaInInputFile, Map<String, String> renameColumns, Map<String, MaskMode> maskColumns)
throws Exception {
MessageType schema1 = new MessageType(
"schema",
new PrimitiveType(OPTIONAL, INT64, "DocId"),
Expand Down Expand Up @@ -670,27 +698,32 @@ public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInput
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
if (wrongSchemaInInputFile) {
inputFiles.add(new TestFileBuilder(conf, schema2)
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
} else {
inputFilesToJoin.add(new TestFileBuilder(conf, schema2)
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
if (wrongSchemaInInputFile != null) {
if (wrongSchemaInInputFile) {
inputFiles.add(new TestFileBuilder(conf, schema2)
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
} else {
inputFilesToJoin.add(new TestFileBuilder(conf, schema2)
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
}
}

RewriteOptions.Builder builder = createBuilder(
inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()),
inputFilesToJoin.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()),
false);
RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy).build();
RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy)
.renameColumns(renameColumns)
.mask(maskColumns)
.build();

// This should throw an exception because the schemas are different
rewriter = new ParquetRewriter(options);
Expand Down

0 comments on commit d78132d

Please sign in to comment.