Skip to content

Commit

Permalink
feat: Add Parquet field_id writing (#6381)
Browse files Browse the repository at this point in the history
This allows for the writing of Parquet column field_ids.

This is an extraction from #6156 (which will need to expose deeper hooks
to allow Iceberg to fully control field_id resolution during reading).

This is to ensure we can correctly write down Iceberg tables which must
write field_ids which is necessary for #5989

In addition, it was noticed that Parquet ColumnMetaData encoding was
written down in a non-deterministic order due to the use of a HashSet;
it has been updated to an EnumSet to support a more consistent Parquet
serialization. This was necessary to test out field_id writing.
  • Loading branch information
devinrsmith authored Nov 19, 2024
1 parent c5018cf commit e096c1f
Show file tree
Hide file tree
Showing 12 changed files with 862 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.nio.IntBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.EnumSet;
import java.util.Set;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
Expand All @@ -57,7 +57,9 @@ final class ColumnWriterImpl implements ColumnWriter {
private final RunLengthBitPackingHybridEncoder dlEncoder;
private final RunLengthBitPackingHybridEncoder rlEncoder;
private long dictionaryOffset = -1;
private final Set<Encoding> encodings = new HashSet<>();
// The downstream writing code (ParquetFileWriter) seems to respect the traversal order of this set. As such, to
// improve determinism, we are using an EnumSet.
private final Set<Encoding> encodings = EnumSet.noneOf(Encoding.class);
private long firstDataPageOffset = -1;
private long uncompressedLength;
private long compressedLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
*/
class MappedSchema {

@VisibleForTesting
static final String SCHEMA_NAME = "root";

static MappedSchema create(
final Map<String, Map<ParquetCacheTags, Object>> computedCache,
final TableDefinition definition,
Expand All @@ -32,7 +35,7 @@ static MappedSchema create(
for (final ColumnDefinition<?> columnDefinition : definition.getColumns()) {
builder.addField(createType(computedCache, columnDefinition, rowSet, columnSourceMap, instructions));
}
final MessageType schema = builder.named("root");
final MessageType schema = builder.named(SCHEMA_NAME);
return new MappedSchema(definition, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
//
package io.deephaven.parquet.table;

import io.deephaven.api.util.NameValidator;
import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.ColumnToCodecMappings;
import io.deephaven.hash.KeyedObjectHashMap;
import io.deephaven.hash.KeyedObjectKey;
import io.deephaven.hash.KeyedObjectKey.Basic;
import io.deephaven.parquet.base.ParquetUtils;
import io.deephaven.util.annotations.VisibleForTesting;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand All @@ -21,6 +23,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -114,6 +117,14 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract boolean useDictionary(String columnName);

/**
* The field ID for the given {@code columnName}.
*
* @param columnName the Deephaven column name
* @return the field id
*/
public abstract OptionalInt getFieldId(final String columnName);

public abstract Object getSpecialInstructions();

public abstract String getCompressionCodecName();
Expand Down Expand Up @@ -200,6 +211,7 @@ public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions
}

public static final ParquetInstructions EMPTY = new ParquetInstructions() {

@Override
public String getParquetColumnNameFromColumnNameOrDefault(final String columnName) {
return columnName;
Expand Down Expand Up @@ -228,6 +240,11 @@ public boolean useDictionary(final String columnName) {
return false;
}

@Override
public OptionalInt getFieldId(String columnName) {
return OptionalInt.empty();
}

@Override
@Nullable
public Object getSpecialInstructions() {
Expand Down Expand Up @@ -319,14 +336,31 @@ ParquetInstructions withIndexColumns(final Collection<List<String>> indexColumns
};

private static class ColumnInstructions {

private static final KeyedObjectKey<String, ColumnInstructions> COLUMN_NAME_KEY = new Basic<>() {
@Override
public String getKey(@NotNull final ColumnInstructions columnInstructions) {
return columnInstructions.getColumnName();
}
};

private static final KeyedObjectKey<String, ColumnInstructions> PARQUET_COLUMN_NAME_KEY = new Basic<>() {
@Override
public String getKey(@NotNull final ColumnInstructions columnInstructions) {
return columnInstructions.getParquetColumnName();
}
};

private final String columnName;
private String parquetColumnName;
private String codecName;
private String codecArgs;
private boolean useDictionary;
private Integer fieldId;

public ColumnInstructions(final String columnName) {
this.columnName = columnName;
this.columnName = Objects.requireNonNull(columnName);
NameValidator.validateColumnName(columnName);
}

public String getColumnName() {
Expand All @@ -338,6 +372,12 @@ public String getParquetColumnName() {
}

public ColumnInstructions setParquetColumnName(final String parquetColumnName) {
if (this.parquetColumnName != null && !this.parquetColumnName.equals(parquetColumnName)) {
throw new IllegalArgumentException(
"Cannot add a mapping from parquetColumnName=" + parquetColumnName
+ ": columnName=" + columnName + " already mapped to parquetColumnName="
+ this.parquetColumnName);
}
this.parquetColumnName = parquetColumnName;
return this;
}
Expand Down Expand Up @@ -367,6 +407,19 @@ public boolean useDictionary() {
public void useDictionary(final boolean useDictionary) {
this.useDictionary = useDictionary;
}

public OptionalInt fieldId() {
return fieldId == null ? OptionalInt.empty() : OptionalInt.of(fieldId);
}

public void setFieldId(final int fieldId) {
if (this.fieldId != null && this.fieldId != fieldId) {
throw new IllegalArgumentException(
String.format("Inconsistent fieldId for columnName=%s, already set fieldId=%d", columnName,
this.fieldId));
}
this.fieldId = fieldId;
}
}

private static final class ReadOnly extends ParquetInstructions {
Expand Down Expand Up @@ -424,8 +477,8 @@ private ReadOnly(
.collect(Collectors.toUnmodifiableList());
}

private String getOrDefault(final String columnName, final String defaultValue,
final Function<ColumnInstructions, String> fun) {
private <T> T getOrDefault(final String columnName, final T defaultValue,
final Function<ColumnInstructions, T> fun) {
if (columnNameToInstructions == null) {
return defaultValue;
}
Expand Down Expand Up @@ -480,6 +533,11 @@ public boolean useDictionary(final String columnName) {
return getOrDefault(columnName, false, ColumnInstructions::useDictionary);
}

@Override
public OptionalInt getFieldId(String columnName) {
return getOrDefault(columnName, OptionalInt.empty(), ColumnInstructions::fieldId);
}

@Override
public String getCompressionCodecName() {
return compressionCodecName;
Expand Down Expand Up @@ -656,75 +714,22 @@ public Builder(final ParquetInstructions parquetInstructions) {
indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null);
}

private void newColumnNameToInstructionsMap() {
columnNameToInstructions = new KeyedObjectHashMap<>(new KeyedObjectKey.Basic<>() {
@Override
public String getKey(@NotNull final ColumnInstructions value) {
return value.getColumnName();
}
});
}

private void newParquetColumnNameToInstructionsMap() {
parquetColumnNameToInstructions =
new KeyedObjectHashMap<>(new KeyedObjectKey.Basic<>() {
@Override
public String getKey(@NotNull final ColumnInstructions value) {
return value.getParquetColumnName();
}
});
}

public Builder addColumnNameMapping(final String parquetColumnName, final String columnName) {
if (parquetColumnName.equals(columnName)) {
return this;
}
if (columnNameToInstructions == null) {
newColumnNameToInstructionsMap();
final ColumnInstructions ci = new ColumnInstructions(columnName);
ci.setParquetColumnName(parquetColumnName);
columnNameToInstructions.put(columnName, ci);
newParquetColumnNameToInstructionsMap();
parquetColumnNameToInstructions.put(parquetColumnName, ci);
return this;
}

ColumnInstructions ci = columnNameToInstructions.get(columnName);
if (ci != null) {
if (ci.parquetColumnName != null) {
if (ci.parquetColumnName.equals(parquetColumnName)) {
return this;
}
throw new IllegalArgumentException(
"Cannot add a mapping from parquetColumnName=" + parquetColumnName
+ ": columnName=" + columnName + " already mapped to parquetColumnName="
+ ci.parquetColumnName);
}
} else {
ci = new ColumnInstructions(columnName);
columnNameToInstructions.put(columnName, ci);
}

final ColumnInstructions ci = getOrCreateColumnInstructions(columnName);
ci.setParquetColumnName(parquetColumnName);
if (parquetColumnNameToInstructions == null) {
newParquetColumnNameToInstructionsMap();
parquetColumnNameToInstructions.put(parquetColumnName, ci);
return this;
parquetColumnNameToInstructions = new KeyedObjectHashMap<>(ColumnInstructions.PARQUET_COLUMN_NAME_KEY);
}

final ColumnInstructions fromParquetColumnNameInstructions =
parquetColumnNameToInstructions.get(parquetColumnName);
if (fromParquetColumnNameInstructions != null) {
if (fromParquetColumnNameInstructions == ci) {
return this;
}
final ColumnInstructions existing = parquetColumnNameToInstructions.putIfAbsent(parquetColumnName, ci);
if (existing != null) {
// Note: this is a limitation that doesn't need to exist. Technically, we could allow a single physical
// parquet column to manifest as multiple Deephaven columns.
throw new IllegalArgumentException(
"Cannot add new mapping from parquetColumnName=" + parquetColumnName + " to columnName="
+ columnName
+ ": already mapped to columnName="
+ fromParquetColumnNameInstructions.getColumnName());
+ existing.getColumnName());
}
ci.setParquetColumnName(parquetColumnName);
parquetColumnNameToInstructions.put(parquetColumnName, ci);
return this;
}

Expand All @@ -737,7 +742,7 @@ public Builder addColumnCodec(final String columnName, final String codecName) {
}

public Builder addColumnCodec(final String columnName, final String codecName, final String codecArgs) {
final ColumnInstructions ci = getColumnInstructions(columnName);
final ColumnInstructions ci = getOrCreateColumnInstructions(columnName);
ci.setCodecName(codecName);
ci.setCodecArgs(codecArgs);
return this;
Expand All @@ -751,21 +756,35 @@ public Builder addColumnCodec(final String columnName, final String codecName, f
* @param useDictionary The hint value
*/
public Builder useDictionary(final String columnName, final boolean useDictionary) {
final ColumnInstructions ci = getColumnInstructions(columnName);
final ColumnInstructions ci = getOrCreateColumnInstructions(columnName);
ci.useDictionary(useDictionary);
return this;
}

private ColumnInstructions getColumnInstructions(final String columnName) {
final ColumnInstructions ci;
/**
* This is currently only used for writing, allowing the setting of {@code field_id} in the proper Parquet
* {@code SchemaElement}.
*
* <p>
* Setting multiple field ids for a single column name is not allowed.
*
* <p>
* Field ids are not typically configured by end users.
*
* @param columnName the Deephaven column name
* @param fieldId the field id
*/
public Builder setFieldId(final String columnName, final int fieldId) {
final ColumnInstructions ci = getOrCreateColumnInstructions(columnName);
ci.setFieldId(fieldId);
return this;
}

private ColumnInstructions getOrCreateColumnInstructions(final String columnName) {
if (columnNameToInstructions == null) {
newColumnNameToInstructionsMap();
ci = new ColumnInstructions(columnName);
columnNameToInstructions.put(columnName, ci);
} else {
ci = columnNameToInstructions.putIfAbsent(columnName, ColumnInstructions::new);
columnNameToInstructions = new KeyedObjectHashMap<>(ColumnInstructions.COLUMN_NAME_KEY);
}
return ci;
return columnNameToInstructions.putIfAbsent(columnName, ColumnInstructions::new);
}

public Builder setCompressionCodecName(final String compressionCodecName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.util.codec.SerializableCodec;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
Expand Down Expand Up @@ -481,14 +482,17 @@ default Type createSchemaType(
isRepeating = false;
}
if (!isRepeating) {
instructions.getFieldId(columnDefinition.getName()).ifPresent(builder::id);
return builder.named(parquetColumnName);
}
// Note: the Parquet type builder would take care of the element name for us if we were constructing it
// ahead of time via ListBuilder.optionalElement
// (org.apache.parquet.schema.Types.BaseListBuilder.ElementBuilder.named) when we named the outer list; but
// since we are constructing types recursively (without regard to the outer type), we are responsible for
// setting the element name correctly at this point in time.
return Types.optionalList()
final Types.ListBuilder<GroupType> listBuilder = Types.optionalList();
instructions.getFieldId(columnDefinition.getName()).ifPresent(listBuilder::id);
return listBuilder
.element(builder.named(ELEMENT_NAME))
.named(parquetColumnName);
}
Expand Down
Loading

0 comments on commit e096c1f

Please sign in to comment.