diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java index 0e652396e8a..769f6a234ea 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java @@ -19,15 +19,18 @@ import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap; import static java.util.stream.Collectors.toMap; +import io.delta.kernel.data.MapValue; import io.delta.kernel.data.Row; import io.delta.kernel.expressions.Literal; +import io.delta.kernel.internal.data.DelegateRow; import io.delta.kernel.internal.data.GenericRow; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.types.*; +import io.delta.kernel.utils.DataFileStatistics; import io.delta.kernel.utils.DataFileStatus; import java.net.URI; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import java.util.stream.IntStream; /** Delta log action representing an `AddFile` */ @@ -55,12 +58,15 @@ public class AddFile { .add( "tags", new MapType(StringType.STRING, StringType.STRING, true /* valueContainsNull */), - true /* nullable */); + true /* nullable */) + .add("baseRowId", LongType.LONG, true /* nullable */) + .add("defaultRowCommitVersion", LongType.LONG, true /* nullable */); public static final StructType SCHEMA_WITH_STATS = SCHEMA_WITHOUT_STATS.add(JSON_STATS_FIELD); /** Full schema of the {@code add} action in the Delta Log. */ public static final StructType FULL_SCHEMA = SCHEMA_WITH_STATS; + // There are more fields which are added when row-id tracking and clustering is enabled. // When Kernel starts supporting row-ids and clustering, we should add those fields here. @@ -93,4 +99,149 @@ public static Row convertDataFileStatus( // any fields not present in the valueMap are considered null return new GenericRow(FULL_SCHEMA, valueMap); } + + /** + * The underlying {@link Row} that represents an 'AddFile' action and contains all its field + * values. Can be either a {@link GenericRow} or a {@link DelegateRow}. + */ + private final Row row; + + public AddFile(Row row) { + this.row = row; + } + + public Row toRow() { + return row; + } + + public String getPath() { + return row.getString(COL_NAME_TO_ORDINAL.get("path")); + } + + public MapValue getPartitionValues() { + return row.getMap(COL_NAME_TO_ORDINAL.get("partitionValues")); + } + + public long getSize() { + return row.getLong(COL_NAME_TO_ORDINAL.get("size")); + } + + public long getModificationTime() { + return row.getLong(COL_NAME_TO_ORDINAL.get("modificationTime")); + } + + public boolean getDataChange() { + return row.getBoolean(COL_NAME_TO_ORDINAL.get("dataChange")); + } + + public Optional getDeletionVector() { + int ordinal = COL_NAME_TO_ORDINAL.get("deletionVector"); + return Optional.ofNullable( + row.isNullAt(ordinal) ? null : DeletionVectorDescriptor.fromRow(row.getStruct(ordinal))); + } + + public Optional getTags() { + int ordinal = COL_NAME_TO_ORDINAL.get("tags"); + return Optional.ofNullable(row.isNullAt(ordinal) ? null : row.getMap(ordinal)); + } + + public Optional getBaseRowId() { + int ordinal = COL_NAME_TO_ORDINAL.get("baseRowId"); + return Optional.ofNullable(row.isNullAt(ordinal) ? null : row.getLong(ordinal)); + } + + public Optional getDefaultRowCommitVersion() { + int ordinal = COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion"); + return Optional.ofNullable(row.isNullAt(ordinal) ? null : row.getLong(ordinal)); + } + + public Optional getStats() { + int ordinal = COL_NAME_TO_ORDINAL.get("stats"); + return Optional.ofNullable( + row.isNullAt(ordinal) + ? null + : DataFileStatistics.deserializeFromJson(row.getString(ordinal)).orElse(null)); + } + + public Optional getNumRecords() { + return this.getStats().map(DataFileStatistics::getNumRecords); + } + + /** + * Returns a new {@link AddFile} with the provided baseRowId. Under the hood, this is achieved by + * creating a new {@link DelegateRow} with the baseRowId overridden. + */ + public AddFile withNewBaseRowId(long baseRowId) { + Map overrides = + Collections.singletonMap(COL_NAME_TO_ORDINAL.get("baseRowId"), baseRowId); + return new AddFile(new DelegateRow(row, overrides)); + } + + /** + * Returns a new {@link AddFile} with the provided defaultRowCommitVersion. Under the hood, this + * is achieved by creating a new {@link DelegateRow} with the defaultRowCommitVersion overridden. + */ + public AddFile withNewDefaultRowCommitVersion(long defaultRowCommitVersion) { + Map overrides = + Collections.singletonMap( + COL_NAME_TO_ORDINAL.get("defaultRowCommitVersion"), defaultRowCommitVersion); + return new AddFile(new DelegateRow(row, overrides)); + } + + @Override + public String toString() { + // Convert the partitionValues and tags to Java Maps and make them sorted by key. + Map partitionValuesJavaMap = VectorUtils.toJavaMap(getPartitionValues()); + Map sortedPartitionValuesJavaMap = new TreeMap<>(partitionValuesJavaMap); + + Optional> tagsJavaMap = getTags().map(VectorUtils::toJavaMap); + Optional> sortedTagsJavaMap = tagsJavaMap.map(TreeMap::new); + + StringBuilder sb = new StringBuilder(); + sb.append("AddFile{"); + sb.append("path='").append(getPath()).append('\''); + sb.append(", partitionValues=").append(sortedPartitionValuesJavaMap); + sb.append(", size=").append(getSize()); + sb.append(", modificationTime=").append(getModificationTime()); + sb.append(", dataChange=").append(getDataChange()); + sb.append(", deletionVector=").append(getDeletionVector()); + sb.append(", tags=").append(sortedTagsJavaMap); + sb.append(", baseRowId=").append(getBaseRowId()); + sb.append(", defaultRowCommitVersion=").append(getDefaultRowCommitVersion()); + sb.append(", stats=").append(getStats()); + sb.append('}'); + return sb.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof AddFile)) return false; + AddFile other = (AddFile) obj; + return getSize() == other.getSize() + && getModificationTime() == other.getModificationTime() + && getDataChange() == other.getDataChange() + && Objects.equals(getPath(), other.getPath()) + && Objects.equals(getPartitionValues(), other.getPartitionValues()) + && Objects.equals(getDeletionVector(), other.getDeletionVector()) + && Objects.equals(getTags(), other.getTags()) + && Objects.equals(getBaseRowId(), other.getBaseRowId()) + && Objects.equals(getDefaultRowCommitVersion(), other.getDefaultRowCommitVersion()) + && Objects.equals(getStats(), other.getStats()); + } + + @Override + public int hashCode() { + return Objects.hash( + getPath(), + getPartitionValues(), + getSize(), + getModificationTime(), + getDataChange(), + getDeletionVector(), + getTags(), + getBaseRowId(), + getDefaultRowCommitVersion(), + getStats()); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/DelegateRow.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/DelegateRow.java new file mode 100644 index 00000000000..82897201095 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/DelegateRow.java @@ -0,0 +1,213 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.data; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.data.Row; +import io.delta.kernel.types.*; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; + +/** + * This wraps an existing {@link Row} and allows overriding values for some particular ordinals. + * This enables creating a modified view of a row without mutating the original row. + */ +public class DelegateRow implements Row { + + /** The underlying row being delegated to. */ + private final Row row; + + /** + * A map of ordinal-to-value overrides that takes precedence over the underlying row's data. When + * accessing data, this map is checked first before falling back to the underlying row. + */ + private final Map overrides; + + public DelegateRow(Row row, Map overrides) { + requireNonNull(row, "row is null"); + requireNonNull(overrides, "map of overrides is null"); + + if (row instanceof DelegateRow) { + // If the row is already a delegation of another row, we merge the overrides and keep only + // one layer of delegation. + DelegateRow delegateRow = (DelegateRow) row; + this.row = delegateRow.row; + this.overrides = new HashMap<>(delegateRow.overrides); + this.overrides.putAll(overrides); + } else { + this.row = row; + this.overrides = new HashMap<>(overrides); + } + } + + @Override + public StructType getSchema() { + return row.getSchema(); + } + + @Override + public boolean isNullAt(int ordinal) { + if (overrides.containsKey(ordinal)) { + return overrides.get(ordinal) == null; + } + return row.isNullAt(ordinal); + } + + @Override + public boolean getBoolean(int ordinal) { + if (overrides.containsKey(ordinal)) { + throwIfUnsafeAccess(ordinal, BooleanType.class, "boolean"); + return (boolean) overrides.get(ordinal); + } + return row.getBoolean(ordinal); + } + + @Override + public byte getByte(int ordinal) { + if (overrides.containsKey(ordinal)) { + throwIfUnsafeAccess(ordinal, ByteType.class, "byte"); + return (byte) overrides.get(ordinal); + } + return row.getByte(ordinal); + } + + @Override + public short getShort(int ordinal) { + throwIfUnsafeAccess(ordinal, ShortType.class, "short"); + if (overrides.containsKey(ordinal)) { + return (short) overrides.get(ordinal); + } + return row.getShort(ordinal); + } + + @Override + public int getInt(int ordinal) { + if (overrides.containsKey(ordinal)) { + throwIfUnsafeAccess(ordinal, IntegerType.class, "integer"); + return (int) overrides.get(ordinal); + } + return row.getInt(ordinal); + } + + @Override + public long getLong(int ordinal) { + if (overrides.containsKey(ordinal)) { + throwIfUnsafeAccess(ordinal, LongType.class, "long"); + return (long) overrides.get(ordinal); + } + return row.getLong(ordinal); + } + + @Override + public float getFloat(int ordinal) { + if (overrides.containsKey(ordinal)) { + throwIfUnsafeAccess(ordinal, FloatType.class, "float"); + return (float) overrides.get(ordinal); + } + return row.getFloat(ordinal); + } + + @Override + public double getDouble(int ordinal) { + if (overrides.containsKey(ordinal)) { + throwIfUnsafeAccess(ordinal, DoubleType.class, "double"); + return (double) overrides.get(ordinal); + } + return row.getDouble(ordinal); + } + + @Override + public String getString(int ordinal) { + if (overrides.containsKey(ordinal)) { + throwIfUnsafeAccess(ordinal, StringType.class, "string"); + return (String) overrides.get(ordinal); + } + return row.getString(ordinal); + } + + @Override + public BigDecimal getDecimal(int ordinal) { + if (overrides.containsKey(ordinal)) { + throwIfUnsafeAccess(ordinal, DecimalType.class, "decimal"); + return (BigDecimal) overrides.get(ordinal); + } + return row.getDecimal(ordinal); + } + + @Override + public byte[] getBinary(int ordinal) { + if (overrides.containsKey(ordinal)) { + throwIfUnsafeAccess(ordinal, BinaryType.class, "binary"); + return (byte[]) overrides.get(ordinal); + } + return row.getBinary(ordinal); + } + + @Override + public Row getStruct(int ordinal) { + if (overrides.containsKey(ordinal)) { + throwIfUnsafeAccess(ordinal, StructType.class, "struct"); + return (Row) overrides.get(ordinal); + } + return row.getStruct(ordinal); + } + + @Override + public ArrayValue getArray(int ordinal) { + if (overrides.containsKey(ordinal)) { + // TODO: Not sufficient check, also need to check the element type. This should be revisited + // together with the GenericRow. + throwIfUnsafeAccess(ordinal, ArrayType.class, "array"); + return (ArrayValue) overrides.get(ordinal); + } + return row.getArray(ordinal); + } + + @Override + public MapValue getMap(int ordinal) { + if (overrides.containsKey(ordinal)) { + // TODO: Not sufficient check, also need to check the element type. This should be revisited + // together with the GenericRow. + throwIfUnsafeAccess(ordinal, MapType.class, "map"); + return (MapValue) overrides.get(ordinal); + } + return row.getMap(ordinal); + } + + private void throwIfUnsafeAccess( + int ordinal, Class expDataType, String accessType) { + + DataType actualDataType = dataType(ordinal); + if (!expDataType.isAssignableFrom(actualDataType.getClass())) { + String msg = + String.format( + "Trying to access a `%s` value from vector of type `%s`", accessType, actualDataType); + throw new UnsupportedOperationException(msg); + } + } + + private DataType dataType(int ordinal) { + if (row.getSchema().length() <= ordinal) { + throw new IllegalArgumentException("invalid ordinal: " + ordinal); + } + + return row.getSchema().at(ordinal).getDataType(); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java index 41006deffec..6c3fb1cdd78 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/DataFileStatistics.java @@ -19,8 +19,10 @@ import io.delta.kernel.expressions.Column; import io.delta.kernel.expressions.Literal; +import io.delta.kernel.internal.util.JsonUtils; import java.util.Collections; import java.util.Map; +import java.util.Optional; /** Statistics about data file in a Delta Lake table. */ public class DataFileStatistics { @@ -101,4 +103,26 @@ public String serializeAsJson() { // For now just serialize the number of records. return "{\"numRecords\":" + numRecords + "}"; } + + /** + * Deserialize the statistics from a JSON string. For now only the number of records is + * deserialized, the rest of the statistics are not supported yet. + * + * @param json Data statistics JSON string to deserialize. + * @return An {@link Optional} containing the deserialized {@link DataFileStatistics} if present. + */ + public static Optional deserializeFromJson(String json) { + Map keyValueMap = JsonUtils.parseJSONKeyValueMap(json); + + // For now just deserialize the number of records which will be used by row tracking. + String numRecordsStr = keyValueMap.get("numRecords"); + if (numRecordsStr == null) { + return Optional.empty(); + } + long numRecords = Long.parseLong(numRecordsStr); + DataFileStatistics stats = + new DataFileStatistics( + numRecords, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + return Optional.of(stats); + } } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/actions/AddFileSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/actions/AddFileSuite.scala new file mode 100644 index 00000000000..19a32a1478a --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/actions/AddFileSuite.scala @@ -0,0 +1,173 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.actions + +import io.delta.kernel.data.Row +import io.delta.kernel.internal.data.GenericRow +import io.delta.kernel.internal.util.VectorUtils +import io.delta.kernel.internal.util.VectorUtils.stringStringMapValue +import org.scalatest.funsuite.AnyFunSuite + +import java.util.{HashMap => JHashMap} +import java.util.Optional +import scala.collection.JavaConverters._ + +class AddFileSuite extends AnyFunSuite { + + /** + * Generate a GenericRow representing an AddFile action with provided fields. + */ + private def generateTestAddFileRow( + path: String = "path", + partitionValues: Map[String, String] = Map.empty, + size: Long = 10L, + modificationTime: Long = 20L, + dataChange: Boolean = true, + deletionVector: Option[DeletionVectorDescriptor] = Option.empty, + tags: Option[Map[String, String]] = Option.empty, + baseRowId: Option[Long] = Option.empty, + defaultRowCommitVersion: Option[Long] = Option.empty, + stats: Option[String] = Option.empty + ): Row = { + // Generate a GenericRow representing an AddFile action with provided values for testing + val schema = AddFile.FULL_SCHEMA + val nameToOrdinal: Map[String, Int] = (0 until schema.length).map { i => + schema.at(i).getName -> i + }.toMap + + val valueMap = new JHashMap[Integer, Object]() + + valueMap.put(nameToOrdinal("path"), path) + valueMap.put(nameToOrdinal("partitionValues"), stringStringMapValue(partitionValues.asJava)) + valueMap.put(nameToOrdinal("size"), size.asInstanceOf[java.lang.Long]) + valueMap.put(nameToOrdinal("modificationTime"), modificationTime.asInstanceOf[java.lang.Long]) + valueMap.put(nameToOrdinal("dataChange"), dataChange.asInstanceOf[java.lang.Boolean]) + + if (deletionVector.isDefined) { + // DeletionVectorDescriptor currently does not provide a way to convert to a Row + assert(false, "DeletionVectorDescriptor is not supported in AddFileSuite") + } + if (tags.isDefined) { + valueMap.put(nameToOrdinal("tags"), stringStringMapValue(tags.get.asJava)) + } + if (baseRowId.isDefined) { + valueMap.put(nameToOrdinal("baseRowId"), baseRowId.get.asInstanceOf[java.lang.Long]) + } + if (defaultRowCommitVersion.isDefined) { + valueMap.put( + nameToOrdinal("defaultRowCommitVersion"), + defaultRowCommitVersion.get.asInstanceOf[java.lang.Long] + ) + } + if (stats.isDefined) { + valueMap.put(nameToOrdinal("stats"), stats.get) + } + + new GenericRow(schema, valueMap) + } + + test("getters can read AddFile's fields from the backing row") { + val addFileRow = generateTestAddFileRow( + path = "test/path", + partitionValues = Map("a" -> "1"), + size = 1L, + modificationTime = 10L, + dataChange = true, + deletionVector = Option.empty, + tags = Option(Map("tag1" -> "value1")), + baseRowId = Option(30L), + defaultRowCommitVersion = Option(40L), + stats = Option("{\"numRecords\":100}") + ) + + val addFile = new AddFile(addFileRow) + + assert(addFile.getPath === "test/path") + assert( + VectorUtils.toJavaMap(addFile.getPartitionValues).asScala.equals(Map("a" -> "1")) + ) + assert(addFile.getSize === 1L) + assert(addFile.getModificationTime === 10L) + assert(addFile.getDataChange === true) + assert(addFile.getDeletionVector === Optional.empty()) + assert( + VectorUtils.toJavaMap(addFile.getTags.get()).asScala.equals(Map("tag1" -> "value1")) + ) + assert(addFile.getBaseRowId === Optional.of(30L)) + assert(addFile.getDefaultRowCommitVersion === Optional.of(40L)) + // DataFileStatistics doesn't have an equals() override, so we need to compare the string + assert(addFile.getStats.get().toString === "{\"numRecords\":100}") + assert(addFile.getNumRecords === Optional.of(100L)) + } + + test("update a single field of an AddFile") { + val addFileRow = generateTestAddFileRow(baseRowId = Option(1L)) + val addFileAction = new AddFile(addFileRow) + + val updatedAddFileAction = addFileAction.withNewBaseRowId(2L) + assert(updatedAddFileAction.getBaseRowId === Optional.of(2L)) + + val updatedAddFileRow = updatedAddFileAction.toRow + assert(new AddFile(updatedAddFileRow).getBaseRowId === Optional.of(2L)) + } + + test("update multiple fields of an AddFile multiple times") { + val baseAddFileRow = + generateTestAddFileRow( + path = "test/path", + baseRowId = Option(0L), + defaultRowCommitVersion = Option(0L) + ) + var addFileAction = new AddFile(baseAddFileRow) + + (1L until 10L).foreach { i => + addFileAction = addFileAction + .withNewBaseRowId(i) + .withNewDefaultRowCommitVersion(i * 10) + + assert(addFileAction.getPath === "test/path") + assert(addFileAction.getBaseRowId === Optional.of(i)) + assert(addFileAction.getDefaultRowCommitVersion === Optional.of(i * 10)) + } + } + + test("toString correctly prints out AddFile and with partition values / tags in sorted order") { + val addFileRow = generateTestAddFileRow( + path = "test/path", + partitionValues = Map("b" -> "2", "a" -> "1"), + size = 100L, + modificationTime = 1234L, + dataChange = true, + tags = Option(Map("tag1" -> "value1", "tag2" -> "value2")), + baseRowId = Option(12345L), + defaultRowCommitVersion = Option(67890L), + stats = Option("{\"numRecords\":10000}") + ) + val addFile = new AddFile(addFileRow) + val expectedString = "AddFile{" + + "path='test/path', " + + "partitionValues={a=1, b=2}, " + + "size=100, " + + "modificationTime=1234, " + + "dataChange=true, " + + "deletionVector=Optional.empty, " + + "tags=Optional[{tag1=value1, tag2=value2}], " + + "baseRowId=Optional[12345], " + + "defaultRowCommitVersion=Optional[67890], " + + "stats=Optional[{\"numRecords\":10000}]}" + assert(addFile.toString == expectedString) + } +}