From 069a1f9f4c3e2558b2eeac68b917f07ee7f6f60f Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Tue, 14 May 2024 18:12:33 -0700 Subject: [PATCH] [Kernel] Support for loading protocol and metadata from checksum file in DeltaLog Initial read current version CRC w test to verify wip end-2-end working + tests Co-authored-by: Allison Portis Co-authored-by: Venki Korukanti --- .../internal/replay/ChecksumReader.java | 133 +++++++++ .../kernel/internal/replay/LogReplay.java | 33 +++ .../kernel/internal/replay/VersionStats.java | 74 +++++ .../delta/kernel/internal/util/FileNames.java | 25 +- .../io/delta/kernel/utils/FileStatus.java | 10 + ...4842-81ec-0bc894621351.c000.snappy.parquet | Bin 0 -> 1062 bytes ...4f65-8d62-fc008bfb7b57.c000.snappy.parquet | Bin 0 -> 1062 bytes .../_delta_log/00000000000000000000.crc | 1 + .../_delta_log/00000000000000000000.json | 3 + .../_delta_log/00000000000000000001.crc | 1 + .../_delta_log/00000000000000000001.json | 2 + .../_delta_log/00000000000000000002.crc | 1 + .../_delta_log/00000000000000000002.json | 2 + .../_delta_log/00000000000000000003.crc | 1 + .../_delta_log/00000000000000000003.json | 2 + .../_delta_log/00000000000000000004.crc | 1 + .../_delta_log/00000000000000000004.json | 5 + .../_delta_log/00000000000000000005.crc | 1 + .../_delta_log/00000000000000000005.json | 4 + .../_delta_log/00000000000000000006.crc | 1 + .../_delta_log/00000000000000000006.json | 4 + ...4655-a705-09e597411943.c000.snappy.parquet | Bin 0 -> 791 bytes ...4271-9d69-d02fd204dd63.c000.snappy.parquet | Bin 0 -> 803 bytes ...47c9-9a04-75ab604160da.c000.snappy.parquet | Bin 0 -> 1011 bytes ...467a-b05f-f56c8f582226.c000.snappy.parquet | Bin 0 -> 1008 bytes ...4709-aec0-7b486bef2db1.c000.snappy.parquet | Bin 0 -> 791 bytes ...4e56-86eb-893d16d8ab7b.c000.snappy.parquet | Bin 0 -> 791 bytes .../defaults/CreateCheckpointSuite.scala | 4 +- .../defaults/DeltaTableReadsSuite.scala | 9 + .../defaults/DeltaTableWriteSuiteBase.scala | 3 +- .../defaults/LogReplayMetricsSuite.scala | 271 +++++++++++++----- .../kernel/defaults/utils/TestUtils.scala | 4 + 32 files changed, 519 insertions(+), 76 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/VersionStats.java create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_change_data/birthday=2020-01-01/cdc-00000-993231ce-e967-4842-81ec-0bc894621351.c000.snappy.parquet create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_change_data/birthday=2020-01-01/cdc-00000-cb387749-d66a-4f65-8d62-fc008bfb7b57.c000.snappy.parquet create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000000.crc create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000000.json create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000001.crc create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000001.json create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000002.crc create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000002.json create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000003.crc create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000003.json create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000004.crc create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000004.json create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000005.crc create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000005.json create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000006.crc create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000006.json create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-21994048-f0e7-4655-a705-09e597411943.c000.snappy.parquet create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-29c991a0-4944-4271-9d69-d02fd204dd63.c000.snappy.parquet create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-8da4d42d-b38d-47c9-9a04-75ab604160da.c000.snappy.parquet create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-a0aceea7-2c64-467a-b05f-f56c8f582226.c000.snappy.parquet create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-aebb2514-795c-4709-aec0-7b486bef2db1.c000.snappy.parquet create mode 100644 kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-d6f5bfd1-5f1e-4e56-86eb-893d16d8ab7b.c000.snappy.parquet diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java new file mode 100644 index 00000000000..3c4c5b045e1 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java @@ -0,0 +1,133 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.replay; + +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; + +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.FileNames; +import static io.delta.kernel.internal.replay.VersionStats.fromColumnarBatch; +import static io.delta.kernel.internal.util.FileNames.checksumFile; +import static io.delta.kernel.internal.util.FileNames.isChecksumFile; +import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; + +/** + * Utility method to load protocol and metadata from the Delta log checksum files. + */ +public class ChecksumReader { + private static final Logger logger = LoggerFactory.getLogger(ChecksumReader.class); + + /** + * Load the protocol and metadata from the checksum file at the given version. If the checksum + * file is not found at the given version, it will try to find the latest checksum file that is + * created after the lower bound version or within the last 100 versions. + * + * @param engine the engine to use for reading the checksum file + * @param logPath the path to the Delta log + * @param readVersion the version to read the checksum file from + * @param lowerBoundOpt the lower bound version to search for the checksum file + * @return Optional {@link VersionStats} containing the protocol and metadata, and the version + * of the checksum file. If the checksum file is not found, it will return an empty + */ + public static Optional getVersionStats( + Engine engine, + Path logPath, + long readVersion, + Optional lowerBoundOpt) { + + // First try to load the CRC at given version. If not found or failed to read then try to + // find the latest CRC file that is created after the lower bound version or within the + // last 100 versions. + Path crcFilePath = checksumFile(logPath, readVersion); + Optional versionStatsOpt = readChecksumFile(engine, crcFilePath); + if (versionStatsOpt.isPresent() || + // we don't expect any more checksum files as it is the first version + readVersion == 0) { + return versionStatsOpt; + } + + // Try to list the last 100 CRC files and see if we can find a CRC that we can use + long lowerBound = Math.max( + lowerBoundOpt.orElse(0L) + 1, + Math.max(0, readVersion - 100)); + + Path listFrom = checksumFile(logPath, lowerBound); + try (CloseableIterator crcFiles = + engine.getFileSystemClient().listFrom(listFrom.toString())) { + + List crcFilesList = new ArrayList<>(); + crcFiles.filter(file -> isChecksumFile(new Path(file.getPath()))) + .forEachRemaining(crcFilesList::add); + + // pick the last file which is the latest version that has the CRC file + if (crcFilesList.isEmpty()) { + logger.warn("No checksum files found in the range {} to {}", lowerBound, + readVersion); + return Optional.empty(); + } + + FileStatus latestCRCFile = crcFilesList.get(crcFilesList.size() - 1); + return readChecksumFile(engine, new Path(latestCRCFile.getPath())); + } catch (Exception e) { + logger.warn("Failed to list checksum files from {}", listFrom, e); + return Optional.empty(); + } + + } + + private static Optional readChecksumFile(Engine engine, Path filePath) { + try (CloseableIterator iter = engine.getJsonHandler() + .readJsonFiles( + singletonCloseableIterator(FileStatus.of(filePath.toString())), + VersionStats.FULL_SCHEMA, + Optional.empty())) { + // We do this instead of iterating through the rows or using `getSingularRow` so we + // can use the existing fromColumnVector methods in Protocol, Metadata, Format etc + if (!iter.hasNext()) { + logger.warn("Checksum file is empty: {}", filePath); + return Optional.empty(); + } + + ColumnarBatch batch = iter.next(); + if (batch.getSize() != 1) { + String msg = "Expected exactly one row in the checksum file {}, found {} rows"; + logger.warn(msg, filePath, batch.getSize()); + return Optional.empty(); + } + + long crcVersion = FileNames.checksumVersion(filePath); + + VersionStats versionStats = fromColumnarBatch(engine, crcVersion, batch, 0 /* rowId */); + if (versionStats.getMetadata() == null || versionStats.getProtocol() == null) { + logger.warn("Invalid checksum file missing protocol and/or metadata: {}", filePath); + return Optional.empty(); + } + return Optional.of(versionStats); + } catch (Exception e) { + // This can happen when the version does not have a checksum file + logger.warn("Failed to read checksum file {}", filePath, e); + return Optional.empty(); + } + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index c6a4c945f16..1e25014ebee 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -199,6 +199,39 @@ protected Tuple2 loadTableProtocolAndMetadata( ); } + // Compute the lower bound for the CRC search + // If the snapshot hint is present, we can use it as the lower bound for the CRC search. + // TODO: this can be further improved to make the lower bound until the checkpoint version + Optional crcSearchLowerBound = snapshotHint.map(SnapshotHint::getVersion); + + Optional versionStatsOpt = + ChecksumReader.getVersionStats( + engine, + logSegment.logPath, + snapshotVersion, + crcSearchLowerBound); + + if (versionStatsOpt.isPresent()) { + // We found the protocol and metadata for the version we are looking for + VersionStats versionStats = versionStatsOpt.get(); + if (versionStats.getVersion() == snapshotVersion) { + return new Tuple2<>( + versionStats.getProtocol(), + versionStats.getMetadata() + ); + } + + // We found the protocol and metadata in a version older than the one we are looking + // for. We need to replay the actions to get the latest protocol and metadata, but + // update the hint to read the actions from the version we found to check if the + // protocol and metadata are updated in the versions after the one we found. + snapshotHint = Optional.of(new SnapshotHint( + versionStats.getVersion(), + versionStats.getProtocol(), + versionStats.getMetadata() + )); + } + Protocol protocol = null; Metadata metadata = null; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/VersionStats.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/VersionStats.java new file mode 100644 index 00000000000..dcbd3001063 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/VersionStats.java @@ -0,0 +1,74 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.replay; + +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.StructType; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; + +public class VersionStats { + + public static VersionStats fromColumnarBatch( + Engine engine, long version, ColumnarBatch batch, int rowId) { + // fromColumnVector already takes care of nulls + Protocol protocol = Protocol.fromColumnVector( + batch.getColumnVector(PROTOCOL_ORDINAL), rowId); + Metadata metadata = Metadata.fromColumnVector( + batch.getColumnVector(METADATA_ORDINAL), rowId, engine); + return new VersionStats(version, metadata, protocol); + } + + // We can add additional fields later + public static final StructType FULL_SCHEMA = new StructType() + .add("protocol", Protocol.FULL_SCHEMA) + .add("metadata", Metadata.FULL_SCHEMA); + + private static final int PROTOCOL_ORDINAL = 0; + private static final int METADATA_ORDINAL = 1; + + private final long version; + private final Metadata metadata; + private final Protocol protocol; + + protected VersionStats(long version, Metadata metadata, Protocol protocol) { + this.version = version; + this.metadata = metadata; + this.protocol = protocol; + } + + /** + * The version of the Delta table that this VersionStats represents. + */ + public long getVersion() { + return version; + } + + /** + * The {@link Metadata} stored in this VersionStats. May be null. + */ + public Metadata getMetadata() { + return metadata; + } + + /** + * The {@link Protocol} stored in this VersionStats. May be null. + */ + public Protocol getProtocol() { + return protocol; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java index d8e015e603f..b3a0a82d42e 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java @@ -42,6 +42,8 @@ private FileNames() {} private static final Pattern MULTI_PART_CHECKPOINT_FILE_PATTERN = Pattern.compile("(\\d+)\\.checkpoint\\.\\d+\\.\\d+\\.parquet"); + private static final Pattern checksumFileRegex = Pattern.compile("(\\d+)\\.crc"); + public static final String SIDECAR_DIRECTORY = "_sidecars"; /** @@ -81,6 +83,17 @@ public static String sidecarFile(Path path, String sidecar) { return String.format("%s/%s/%s", path.toString(), SIDECAR_DIRECTORY, sidecar); } + /** + * Returns the path to the checksum file for the given version. + */ + public static Path checksumFile(Path path, long version) { + return new Path(path, String.format("%020d.crc", version)); + } + + public static long checksumVersion(Path path) { + return Long.parseLong(path.getName().split("\\.")[0]); + } + /** * Returns the prefix of all delta log files for the given version. *

@@ -159,11 +172,14 @@ public static boolean isV2CheckpointFile(String fileName) { return V2_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches(); } - public static boolean isCommitFile(String fileName) { return DELTA_FILE_PATTERN.matcher(new Path(fileName).getName()).matches(); } + public static boolean isChecksumFile(Path checksumFilePath) { + return checksumFileRegex.matcher(checksumFilePath.getName()).matches(); + } + /** * Get the version of the checkpoint, checksum or delta file. Throws an error if an unexpected * file type is seen. These unexpected files should be filtered out to ensure forward @@ -175,12 +191,11 @@ public static long getFileVersion(Path path) { return checkpointVersion(path); } else if (isCommitFile(path.getName())) { return deltaVersion(path); - //} else if (isChecksumFile(path)) { - // checksumVersion(path); + } else if (isChecksumFile(path)) { + return checksumVersion(path); } else { throw new IllegalArgumentException( - String.format("Unexpected file type found in transaction log: %s", path) - ); + String.format("Unexpected file type found in transaction log: %s", path)); } } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/FileStatus.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/FileStatus.java index 28fdf609219..c82ae456b64 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/FileStatus.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/FileStatus.java @@ -78,4 +78,14 @@ public long getModificationTime() { public static FileStatus of(String path, long size, long modificationTime) { return new FileStatus(path, size, modificationTime); } + + /** + * Create a {@link FileStatus} with the given path with size and modification time set to 0. + * + * @param path Fully qualified file path. + * @return {@link FileStatus} object + */ + public static FileStatus of(String path) { + return new FileStatus(path, 0 /* size */, 0 /* modTime */); + } } diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_change_data/birthday=2020-01-01/cdc-00000-993231ce-e967-4842-81ec-0bc894621351.c000.snappy.parquet b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_change_data/birthday=2020-01-01/cdc-00000-993231ce-e967-4842-81ec-0bc894621351.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..4b7c83eb4e16259c33320b33ff951b2234c2d8c7 GIT binary patch literal 1062 zcma)+O=}ZD7{{O8)EL_w6gtDKEEtwHw2-cwrin?(MXe=5DIwHLwUpV-Bwf1Mq`MoX zlw1@q;yK_|zeGQXdKA4BPksiUeTiMkrLa3M&-|YMJoBGq+mAbC0)&$LkR`_<>g6mqv2BUwbc!V!T` z?#T+1Hp7?53I&J)3n8Yks7r;jNe)W}$&yNQq=l)VsI*OXlhITnop6ZIUMug9xOaN& zTPI?`Nft4QhU_E?s@D!Lj2RWk8t@G#m2aOs7U~CR}Ej-^I zy6jd(ujpl9>vM#E=ID5RXJZlQ-tdh+5CQM@Cu2Ug|c4W)QyeMCDXI5Q-8xSjIF|&Ikuf{ yq0%c?jS6qn>!N2?dfaSOO|#c9w(8|>PZ&6TROO;(tt(CZZ^RF@G5~G-Li`29Bl}GN literal 0 HcmV?d00001 diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_change_data/birthday=2020-01-01/cdc-00000-cb387749-d66a-4f65-8d62-fc008bfb7b57.c000.snappy.parquet b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_change_data/birthday=2020-01-01/cdc-00000-cb387749-d66a-4f65-8d62-fc008bfb7b57.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..a1d311a720c85b49d908671c8ba32911f7ccad82 GIT binary patch literal 1062 zcma)+O=}ZD7{{O8)HJj?D0GHdSuiYZXdzv9O%s!ni&{&BQbMVhs)X6iBwf1Mw7VOn zgj~D`9z6B~c=9A(1iygaKs<`4;>nL7KKl~8l1pKBUVqPjp83!2b{-s<1PCR!$%oIM z-Yu#`#$lcm0G8Kf0LV!a))BKp82S3<v4hs%`??m?MHSSI6r+8;el)N3Zmu2zif(oc)j{T696$wibl`*bcRd)^~*4 z3$*QL6D_(OT#B1Y26Vgug5KD5x#c2(p`xPARJHBNbXxfrjaP(lx{Kq5Vj%qMM`vH7 zD;Z4o8acd}0=qM3M&joV55*zcA6a^odb2t7OR1S9Q#bGF#%AP_Nkz8hS}>21Bdr(T-5CirG?){QlOm!=-?mXFX1x# Ak^lez literal 0 HcmV?d00001 diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000000.crc b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000000.crc new file mode 100644 index 00000000000..6c569f076b0 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000000.crc @@ -0,0 +1 @@ +{"tableSizeBytes":0,"numFiles":0,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"eabe3042-b6e7-4710-aa7f-70ab5cfac6cb"} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000000.json b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000000.json new file mode 100644 index 00000000000..58c4a35c8c2 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1664214549848,"userId":"7953272455820895","userName":"lin.zhou@databricks.com","operation":"CREATE TABLE","operationParameters":{"isManaged":"true","description":null,"partitionBy":"[\"birthday\"]","properties":"{\"delta.enableChangeDataFeed\":\"true\"}"},"notebook":{"notebookId":"3194333061073108"},"clusterId":"0819-204509-hill72","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"eabe3042-b6e7-4710-aa7f-70ab5cfac6cb"}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":4}} +{"metaData":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691}} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000001.crc b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000001.crc new file mode 100644 index 00000000000..6dae28b0a63 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000001.crc @@ -0,0 +1 @@ +{"tableSizeBytes":791,"numFiles":1,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[791,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"f7b5a77a-f86d-40b6-b266-5da97aecbe6e"} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000001.json b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000001.json new file mode 100644 index 00000000000..d3ac164e863 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1664214552033,"userId":"7953272455820895","userName":"lin.zhou@databricks.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"3194333061073108"},"clusterId":"0819-204509-hill72","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"791"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"f7b5a77a-f86d-40b6-b266-5da97aecbe6e"}} +{"add":{"path":"birthday=2020-01-01/part-00000-21994048-f0e7-4655-a705-09e597411943.c000.snappy.parquet","partitionValues":{"birthday":"2020-01-01"},"size":791,"modificationTime":1664214552000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"1\",\"age\":1},\"maxValues\":{\"name\":\"1\",\"age\":1},\"nullCount\":{\"name\":0,\"age\":0}}","tags":{"INSERTION_TIME":"1664214552000000","MIN_INSERTION_TIME":"1664214552000000","MAX_INSERTION_TIME":"1664214552000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000002.crc b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000002.crc new file mode 100644 index 00000000000..def11e4ee40 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000002.crc @@ -0,0 +1 @@ +{"tableSizeBytes":1582,"numFiles":2,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[1582,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"7c26a223-ac3d-4368-9f92-efcf5fd1e600"} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000002.json b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000002.json new file mode 100644 index 00000000000..3fff7e36f05 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1664214553727,"userId":"7953272455820895","userName":"lin.zhou@databricks.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"3194333061073108"},"clusterId":"0819-204509-hill72","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"791"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"7c26a223-ac3d-4368-9f92-efcf5fd1e600"}} +{"add":{"path":"birthday=2020-01-01/part-00000-d6f5bfd1-5f1e-4e56-86eb-893d16d8ab7b.c000.snappy.parquet","partitionValues":{"birthday":"2020-01-01"},"size":791,"modificationTime":1664214554000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"2\",\"age\":2},\"maxValues\":{\"name\":\"2\",\"age\":2},\"nullCount\":{\"name\":0,\"age\":0}}","tags":{"INSERTION_TIME":"1664214554000000","MIN_INSERTION_TIME":"1664214554000000","MAX_INSERTION_TIME":"1664214554000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000003.crc b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000003.crc new file mode 100644 index 00000000000..66e8905d84f --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000003.crc @@ -0,0 +1 @@ +{"tableSizeBytes":2373,"numFiles":3,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[2373,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"74164bc6-d4d4-48f6-b69c-983d95b0fb01"} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000003.json b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000003.json new file mode 100644 index 00000000000..8b966f5af5c --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000003.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1664214555352,"userId":"7953272455820895","userName":"lin.zhou@databricks.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"3194333061073108"},"clusterId":"0819-204509-hill72","readVersion":2,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"791"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"74164bc6-d4d4-48f6-b69c-983d95b0fb01"}} +{"add":{"path":"birthday=2020-01-01/part-00000-aebb2514-795c-4709-aec0-7b486bef2db1.c000.snappy.parquet","partitionValues":{"birthday":"2020-01-01"},"size":791,"modificationTime":1664214556000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"3\",\"age\":3},\"maxValues\":{\"name\":\"3\",\"age\":3},\"nullCount\":{\"name\":0,\"age\":0}}","tags":{"INSERTION_TIME":"1664214556000000","MIN_INSERTION_TIME":"1664214556000000","MAX_INSERTION_TIME":"1664214556000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000004.crc b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000004.crc new file mode 100644 index 00000000000..b8470af60b3 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000004.crc @@ -0,0 +1 @@ +{"tableSizeBytes":803,"numFiles":1,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[803,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"7136ee6c-04c0-4470-ada0-f12c77f0cef4"} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000004.json b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000004.json new file mode 100644 index 00000000000..97d61a597f6 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000004.json @@ -0,0 +1,5 @@ +{"commitInfo":{"timestamp":1664214570422,"userId":"7953272455820895","userName":"lin.zhou@databricks.com","operation":"OPTIMIZE","operationParameters":{"predicate":"[]","zOrderBy":"[]","batchId":"0","auto":false},"notebook":{"notebookId":"3194333061073108"},"clusterId":"0819-204509-hill72","readVersion":3,"isolationLevel":"SnapshotIsolation","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"3","numRemovedBytes":"2373","p25FileSize":"803","minFileSize":"803","numAddedFiles":"1","maxFileSize":"803","p75FileSize":"803","p50FileSize":"803","numAddedBytes":"803"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"7136ee6c-04c0-4470-ada0-f12c77f0cef4"}} +{"remove":{"path":"birthday=2020-01-01/part-00000-21994048-f0e7-4655-a705-09e597411943.c000.snappy.parquet","deletionTimestamp":1664214569941,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{"birthday":"2020-01-01"},"size":791,"tags":{"MAX_INSERTION_TIME":"1664214552000000","INSERTION_TIME":"1664214552000000","MIN_INSERTION_TIME":"1664214552000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"remove":{"path":"birthday=2020-01-01/part-00000-d6f5bfd1-5f1e-4e56-86eb-893d16d8ab7b.c000.snappy.parquet","deletionTimestamp":1664214569941,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{"birthday":"2020-01-01"},"size":791,"tags":{"MAX_INSERTION_TIME":"1664214554000000","INSERTION_TIME":"1664214554000000","MIN_INSERTION_TIME":"1664214554000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"remove":{"path":"birthday=2020-01-01/part-00000-aebb2514-795c-4709-aec0-7b486bef2db1.c000.snappy.parquet","deletionTimestamp":1664214569941,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{"birthday":"2020-01-01"},"size":791,"tags":{"MAX_INSERTION_TIME":"1664214556000000","INSERTION_TIME":"1664214556000000","MIN_INSERTION_TIME":"1664214556000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"birthday=2020-01-01/part-00000-29c991a0-4944-4271-9d69-d02fd204dd63.c000.snappy.parquet","partitionValues":{"birthday":"2020-01-01"},"size":803,"modificationTime":1664214571000,"dataChange":false,"stats":"{\"numRecords\":3,\"minValues\":{\"name\":\"1\",\"age\":1},\"maxValues\":{\"name\":\"3\",\"age\":3},\"nullCount\":{\"name\":0,\"age\":0}}","tags":{"MAX_INSERTION_TIME":"1664214556000000","INSERTION_TIME":"1664214552000000","MIN_INSERTION_TIME":"1664214552000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000005.crc b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000005.crc new file mode 100644 index 00000000000..fdb45f21890 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000005.crc @@ -0,0 +1 @@ +{"tableSizeBytes":1011,"numFiles":1,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[1011,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"8430c3d9-c6e4-4f05-9923-39433a0b694b"} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000005.json b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000005.json new file mode 100644 index 00000000000..e50fe0f09ce --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000005.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1664214587878,"userId":"7953272455820895","userName":"lin.zhou@databricks.com","operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.default.stream_table_optimize.age = 1)\"]"},"notebook":{"notebookId":"3194333061073108"},"clusterId":"0819-204509-hill72","readVersion":4,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numCopiedRows":"2","numAddedChangeFiles":"1","executionTimeMs":"1152","numDeletedRows":"1","scanTimeMs":"530","numAddedFiles":"1","rewriteTimeMs":"621"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"8430c3d9-c6e4-4f05-9923-39433a0b694b"}} +{"remove":{"path":"birthday=2020-01-01/part-00000-29c991a0-4944-4271-9d69-d02fd204dd63.c000.snappy.parquet","deletionTimestamp":1664214587877,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"birthday":"2020-01-01"},"size":803,"tags":{"MAX_INSERTION_TIME":"1664214556000000","INSERTION_TIME":"1664214552000000","MIN_INSERTION_TIME":"1664214552000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"birthday=2020-01-01/part-00000-8da4d42d-b38d-47c9-9a04-75ab604160da.c000.snappy.parquet","partitionValues":{"birthday":"2020-01-01"},"size":1011,"modificationTime":1664214588000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"name\":\"2\",\"age\":2},\"maxValues\":{\"name\":\"3\",\"age\":3},\"nullCount\":{\"name\":0,\"age\":0,\"_change_type\":2}}","tags":{"MAX_INSERTION_TIME":"1664214556000000","INSERTION_TIME":"1664214552000000","MIN_INSERTION_TIME":"1664214552000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"cdc":{"path":"_change_data/birthday=2020-01-01/cdc-00000-cb387749-d66a-4f65-8d62-fc008bfb7b57.c000.snappy.parquet","partitionValues":{"birthday":"2020-01-01"},"size":1062,"dataChange":false}} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000006.crc b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000006.crc new file mode 100644 index 00000000000..682c1be915d --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000006.crc @@ -0,0 +1 @@ +{"tableSizeBytes":1008,"numFiles":1,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[1008,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"67ad682a-e379-4316-a372-764ccb1224e3"} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000006.json b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000006.json new file mode 100644 index 00000000000..04b6911ed58 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/_delta_log/00000000000000000006.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1664214590526,"userId":"7953272455820895","userName":"lin.zhou@databricks.com","operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.default.stream_table_optimize.age = 2)\"]"},"notebook":{"notebookId":"3194333061073108"},"clusterId":"0819-204509-hill72","readVersion":5,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numCopiedRows":"1","numAddedChangeFiles":"1","executionTimeMs":"1287","numDeletedRows":"1","scanTimeMs":"753","numAddedFiles":"1","rewriteTimeMs":"534"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"67ad682a-e379-4316-a372-764ccb1224e3"}} +{"remove":{"path":"birthday=2020-01-01/part-00000-8da4d42d-b38d-47c9-9a04-75ab604160da.c000.snappy.parquet","deletionTimestamp":1664214590525,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"birthday":"2020-01-01"},"size":1011,"tags":{"MAX_INSERTION_TIME":"1664214556000000","INSERTION_TIME":"1664214552000000","MIN_INSERTION_TIME":"1664214552000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"birthday=2020-01-01/part-00000-a0aceea7-2c64-467a-b05f-f56c8f582226.c000.snappy.parquet","partitionValues":{"birthday":"2020-01-01"},"size":1008,"modificationTime":1664214591000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"3\",\"age\":3},\"maxValues\":{\"name\":\"3\",\"age\":3},\"nullCount\":{\"name\":0,\"age\":0,\"_change_type\":1}}","tags":{"MAX_INSERTION_TIME":"1664214556000000","INSERTION_TIME":"1664214552000000","MIN_INSERTION_TIME":"1664214552000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"cdc":{"path":"_change_data/birthday=2020-01-01/cdc-00000-993231ce-e967-4842-81ec-0bc894621351.c000.snappy.parquet","partitionValues":{"birthday":"2020-01-01"},"size":1062,"dataChange":false}} diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-21994048-f0e7-4655-a705-09e597411943.c000.snappy.parquet b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-21994048-f0e7-4655-a705-09e597411943.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..8b2e69c6d8d05e42401ab723e30e45d66d828512 GIT binary patch literal 791 zcmah|Pm9w)6o1Lq-Qe~jbcTrpEWs^ZNXNu(*S7Scu1f`xBI{*UWRgy5*fi}Xsj`$_ z)zcoukKjqX2z~*-iXXy*_$IAd=s`&4<<0xOKffO@gS$^W0)&$5$$AXlAl=pO2O2k<1!6U0c;l-91NuauEnNH2HNZD*{gMkec zj_6dUN3>rvdN>6>6zYY=R>%O`0MH*Asn8D^v7t6VDyw`|wn)E6k86Y8a^U5OMT+s4 z-g>E@s?;})aG-BBs@Dv9sa25Ib7oG`5f@W|TzEl?=h0Z6P7=v>9^}ETrziJ;SsS2O zjznk)6jJbv&QRoTrmFd@OS@6uDp@wYi;N zOr_nmvpk&zx$W3tB;#Rbcc0B|r5}b0Ho4{EP&nEps`fqNaXSag;Qdju2?n3X>}z-)#Ki6?lu%ZKaGO3tl_%u zLACDBqIg(shF;5UicY&NL%$gczti&lu;U)Iy8|S5OcsOI>8G$RwG>uxZ*&Qe`Q< zc=QwO$&)A9lSe;`2!bELkKjvMwa|lyyqWjj@BR7xm<%7B*cjn^nBgDaudZ+ESOHhT zDnfWDBZMkD5?H%1hlOza+-2?3bQ{;&Gg645(bG*(I+^IVKTxI&DxHLKPnt*>RM+sr;> zR;%FWrjd7^wJggzsyE$v6#I22u-lfyd)=-GTqodeukE@)&pPVbejqGx@3*<=c>9_O P536`v@8N+A;rI9rvh=$4 literal 0 HcmV?d00001 diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-8da4d42d-b38d-47c9-9a04-75ab604160da.c000.snappy.parquet b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-8da4d42d-b38d-47c9-9a04-75ab604160da.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..a3f0b3a91aab6ccac9f568b64e61a616c089acd3 GIT binary patch literal 1011 zcma)*Pm9w)7{(`QtVN2f&>1EWv;?{^K`!1-xEOz_x_oL(K8A23QQ`?1%hO=r7WzwKY zB)~FJNQZO_nRXo&B?~G5Ay6vpj1c)E;~e=eVI39v8fZx(#ZGbH6VfU4+evjaE|s8S zTx&8~P4z>hdM#rE6;7lJ75{0CleWkX5be6~=3K~M858nB#y1i9ED@!UA*WIfJbO;n zNiZ_llz}-$?2{3>W8R z(OYX~Bz`_%5$}Qh!qWcIoANlwFU`zXjLNoQmW4}p;5vt4*)+{+zG%-}Z;-c!m6~a> rW~0G}wl!pSvu4}Frde%N219Pb@NSLqy0fLW;npV)(&5+IgWvrhELQ7X literal 0 HcmV?d00001 diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-a0aceea7-2c64-467a-b05f-f56c8f582226.c000.snappy.parquet b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-a0aceea7-2c64-467a-b05f-f56c8f582226.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..d688d932270243dc6d3ca8fb8f558d8e02b994e5 GIT binary patch literal 1008 zcma)*L2DC16vt;LZ45OBh0HK3LBnE03dy>0o0x=9P-}@$BuKneCCqLn>C(+6-Q6f9 z+uwWhpa1N%?>{XQL@2pI-hBP^ zc0EflOp_#q(A5G)2qhE+-X;VygprRw-o00rn2p;plK=*gG`U93|D65WSVEGS2tn#~ ztTM%v55X!$abQhmsx0ajVMS#SLUM|cnx!ey0$h&TWN$t?RmhGU0<_;s2V?FZbp!iQ z3^_@Hj!n$gJU$egXg?cK5J>QWh(D#f_96EMq8px!7YM~_R&C}}S&~*OD3t_FvH-yp z(_PxiruV;8d?zg6Fbz3THg_FFVGs#rXD|OI|H^>aOK5mGL+sXy z8L^+cJQN+UUs`%F_og)Tb8|CuWxae?H%iha%XjRfpkx@vcCKJe9JiM<`{jyZ@_MZ% o`j*+}R=r|b{kpMTEBE@sfa$#o7gc*pZNk4FH&lRo)`q+O7tEyT{Qv*} literal 0 HcmV?d00001 diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-aebb2514-795c-4709-aec0-7b486bef2db1.c000.snappy.parquet b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-aebb2514-795c-4709-aec0-7b486bef2db1.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..8db825f6c37374a369ac862ce67fa9afac8bf9bb GIT binary patch literal 791 zcmah|&5F}d6uwD2W6*Ad-pkDpummS`AiXALI@6YJ)N!aF5~OZMMQ+lY8YWFUNvaH` zTXChh5qIL=2XNst_y9hD58=v_AEnTRkld4d&Ub#kj}!OKV;dt(@E(5u0@CP{EQ;A~=wzyAFEMcTkzy;)*vfKji@DwUl4#j;8&N4*Hq!c=aJD0l>M zq-w;)r}-B|hNDKh;K})m)PE^v9M=(U?8~$s^O@L3548dXjvAi|L}XP{O+(ReL+$P& zMJ;hyCNeo8BSRU&6z~w#E-bVH2G9l}@=L-^@=?M%GF+fi<>zIK^tdcZ>B}2<*_HAZ$a(-SC`sZoK zvaG{a+gpawq~!#5*K&Bj*As!~1l;R)Jum26hdp}|2n(*qT`rFN1J#7}$~WvSEQkxg F#2+IkxeEXQ literal 0 HcmV?d00001 diff --git a/kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-d6f5bfd1-5f1e-4e56-86eb-893d16d8ab7b.c000.snappy.parquet b/kernel/kernel-defaults/src/test/resources/stream_table_optimize/birthday=2020-01-01/part-00000-d6f5bfd1-5f1e-4e56-86eb-893d16d8ab7b.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..64b135b8b9696624c0e4e09d3518de38b53a92e3 GIT binary patch literal 791 zcmah|&5F}d6u!ySF*@A{y_XvaSb`HekY1B^I<}=7bsQ>)1gV=*k(=};hDp;-k}5;# zN)#93QgrQ0%!~K{ZhQz=p8P0{7HKO1##F45I z2cP6$5E+f@>4GO`uTt-|m~mV~xV|sbTFhr+A3fFz6gYNxAP|vNO*IWg!*#W{ixjoQ zVVTI}n2Zf&1XI96P^+-e3K&2egvc)mH^>(W>&S3`N|m3NEz<9k6T=ym1D7WhDaJ2v zo27o|r7k7R5cwbx*-*%5sR6uRP<5UJj4wEFVFfLgg)?zFj|IK@F!Sy_n>;{N+X6)* z;NHcfCzHVuMTpWZz#EaI;XI~Sx~$7=L=UzQ>3PIJ$MdDgSTcXhW+LNLo^kqBTIt!v zLg+(1&61^;>89?7BATZ9@cBy5b%s!3%ne`|2LOzhQN-N{5RfWN{clx$xLU8xf7u`+ zfU7QHoQXgr|I2>4M)cMejZ%qYo{0$vmy2;)v1-qfW+g+@W_E{J?VO*Rgx*=&wk+$g z*>aa*G;MZ$yJvOzpx+n1+x59S=((;xunzn7)E5?9@AkMj@(xrJ)+^tzkFX#P{1Sfv DmQlFb literal 0 HcmV?d00001 diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala index bdda713267f..679a118979c 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala @@ -331,7 +331,7 @@ class CreateCheckpointSuite extends DeltaTableWriteSuiteBase { test("create a checkpoint on a existing table") { withTempDirAndEngine { (tablePath, tc) => - copyTable("time-travel-start-start20-start40", tablePath) + copyGoldenTable("time-travel-start-start20-start40", tablePath) // before creating checkpoint, read and save the expected results using Spark val expResults = readUsingSpark(tablePath) @@ -345,7 +345,7 @@ class CreateCheckpointSuite extends DeltaTableWriteSuiteBase { test("try create a checkpoint on a unsupported table feature table") { withTempDirAndEngine { (tablePath, tc) => - copyTable("dv-with-columnmapping", tablePath) + copyGoldenTable("dv-with-columnmapping", tablePath) val ex2 = intercept[Exception] { kernelCheckpoint(tc, tablePath, checkpointVersion = 5) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala index 89794e578d8..6ebbdc30ced 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala @@ -34,6 +34,15 @@ import scala.collection.JavaConverters._ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils { + test("CRC metadata & protocol fetching") { + val path = getTestResourceFilePath("stream_table_optimize") + + val snapshot = Table.forPath(defaultEngine, path) + .getSnapshotAsOfVersion(defaultEngine, 0) + + readSnapshot(snapshot) + } + ////////////////////////////////////////////////////////////////////////////////// // Timestamp type tests ////////////////////////////////////////////////////////////////////////////////// diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index 99188078102..bd27965e826 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -19,7 +19,6 @@ import io.delta.golden.GoldenTableUtils.goldenTablePath import io.delta.kernel.defaults.engine.DefaultEngine import io.delta.kernel.defaults.utils.TestUtils import io.delta.kernel.engine.Engine -import io.delta.kernel.internal.util.FileNames.checkpointFileSingular import io.delta.kernel.{Table, TransactionCommitResult} import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration @@ -92,7 +91,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { assert(new File(cpPath).exists()) } - def copyTable(goldenTableName: String, targetLocation: String): Unit = { + def copyGoldenTable(goldenTableName: String, targetLocation: String): Unit = { val source = new File(goldenTablePath(goldenTableName)) val target = new File(targetLocation) FileUtils.copyDirectory(source, target) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala index bbddfab5dae..90586488f32 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala @@ -16,11 +16,11 @@ package io.delta.kernel.defaults -import java.io.File import io.delta.kernel.Table -import io.delta.kernel.engine.{Engine, ExpressionHandler, FileSystemClient} import io.delta.kernel.data.ColumnarBatch import io.delta.kernel.defaults.engine.{DefaultEngine, DefaultJsonHandler, DefaultParquetHandler} +import io.delta.kernel.defaults.utils.TestUtils +import io.delta.kernel.engine.{Engine, ExpressionHandler, FileSystemClient} import io.delta.kernel.expressions.Predicate import io.delta.kernel.internal.checkpoints.Checkpointer import io.delta.kernel.internal.fs.Path @@ -29,27 +29,24 @@ import io.delta.kernel.internal.util.Utils.toCloseableIterator import io.delta.kernel.types.StructType import io.delta.kernel.utils.{CloseableIterator, FileStatus} import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.QueryTest import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.functions.col -import org.apache.spark.sql.test.SharedSparkSession +import org.scalatest.funsuite.AnyFunSuite +import java.io.File import java.nio.file.Files import java.util.Optional import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ArrayBuffer -class LogReplayMetricsSuite extends QueryTest - with SharedSparkSession - with DeltaSQLCommandTest { +class LogReplayMetricsSuite extends AnyFunSuite with TestUtils { ///////////////////////// // Test Helper Methods // ///////////////////////// - private def withTempDirAndEngine(f: (File, MetricsEngine) => Unit): Unit = { + private def withTempDirAndEngine(f: (String, MetricsEngine) => Unit): Unit = { val engine = new MetricsEngine(new Configuration() { { // Set the batch sizes to small so that we get to test the multiple batch scenarios. @@ -57,23 +54,25 @@ class LogReplayMetricsSuite extends QueryTest set("delta.kernel.default.json.reader.batch-size", "2"); } }) - withTempDir { dir => f(dir, engine) } + withTempDir { dir => f(dir.getAbsolutePath, engine) } } private def loadPandMCheckMetrics( + snapshotFetchCall: => StructType, engine: MetricsEngine, - table: Table, expJsonVersionsRead: Seq[Long], expParquetVersionsRead: Seq[Long], - expParquetReadSetSizes: Seq[Long] = Nil): Unit = { + expParquetReadSetSizes: Seq[Long] = null, + expChecksumReadSet: Seq[Long] = null): Unit = { engine.resetMetrics() - table.getLatestSnapshot(engine).getSchema(engine) + snapshotFetchCall assertMetrics( engine, expJsonVersionsRead, expParquetVersionsRead, - expParquetReadSetSizes) + expParquetReadSetSizes, + expChecksumReadSet = expChecksumReadSet) } private def loadScanFilesCheckMetrics( @@ -101,8 +100,9 @@ class LogReplayMetricsSuite extends QueryTest engine: MetricsEngine, expJsonVersionsRead: Seq[Long], expParquetVersionsRead: Seq[Long], - expParquetReadSetSizes: Seq[Long], - expLastCheckpointReadCalls: Option[Int] = None): Unit = { + expParquetReadSetSizes: Seq[Long] = null, + expLastCheckpointReadCalls: Option[Int] = None, + expChecksumReadSet: Seq[Long] = null): Unit = { val actualJsonVersionsRead = engine.getJsonHandler.getVersionsRead val actualParquetVersionsRead = engine.getParquetHandler.getVersionsRead @@ -115,7 +115,7 @@ class LogReplayMetricsSuite extends QueryTest s"versions $expParquetVersionsRead but read $actualParquetVersionsRead" ) - if (expParquetReadSetSizes.nonEmpty) { + if (expParquetReadSetSizes != null) { val actualParquetReadSetSizes = engine.getParquetHandler.checkpointReadRequestSizes assert( actualParquetReadSetSizes === expParquetReadSetSizes, s"Expected parquet read set sizes " + @@ -128,6 +128,14 @@ class LogReplayMetricsSuite extends QueryTest assert(actualCalls === expCalls, s"Expected to read last checkpoint metadata $expCalls times but read $actualCalls times") } + + if (expChecksumReadSet != null) { + val actualChecksumReadSet = engine.getJsonHandler.checksumsRead + assert( + actualChecksumReadSet === expChecksumReadSet, s"Expected checksum read set " + + s"$expChecksumReadSet but read $actualChecksumReadSet" + ) + } } private def appendCommit(path: String): Unit = @@ -144,31 +152,35 @@ class LogReplayMetricsSuite extends QueryTest /////////// test("no hint, no checkpoint, reads all files") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndEngine { (path, engine) => for (_ <- 0 to 9) { appendCommit(path) } - val table = Table.forPath(tc, path) - loadPandMCheckMetrics(tc, table, 9L to 0L by -1L, Nil) + val table = Table.forPath(engine, path) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = 9L to 0L by -1L, + expParquetVersionsRead = Nil + ) } } test("no hint, existing checkpoint, reads all files up to that checkpoint") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndEngine { (path, engine) => for (_ <- 0 to 14) { appendCommit(path) } - val table = Table.forPath(tc, path) - loadPandMCheckMetrics(tc, table, 14L to 11L by -1L, Seq(10), Seq(1)) + val table = Table.forPath(engine, path) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = 14L to 11L by -1L, + expParquetVersionsRead = Seq(10), + expParquetReadSetSizes = Seq(1)) } } test("no hint, existing checkpoint, newer P & M update, reads up to P & M commit") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndEngine { (path, engine) => for (_ <- 0 to 12) { appendCommit(path) } // v13 changes the protocol (which also updates the metadata) @@ -182,69 +194,82 @@ class LogReplayMetricsSuite extends QueryTest for (_ <- 14 to 16) { appendCommit(path) } - val table = Table.forPath(tc, path) - loadPandMCheckMetrics(tc, table, 16L to 13L by -1L, Nil) + val table = Table.forPath(engine, path) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = 16L to 13L by -1L, + expParquetVersionsRead = Nil) } } test("hint with no new commits, should read no files") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndEngine { (path, engine) => for (_ <- 0 to 14) { appendCommit(path) } - val table = Table.forPath(tc, path) + val table = Table.forPath(engine, path) - table.getLatestSnapshot(tc).getSchema(tc) + table.getLatestSnapshot(engine).getSchema(engine) // A hint is now saved at v14 - - loadPandMCheckMetrics(tc, table, Nil, Nil) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Nil, + expParquetVersionsRead = Nil) } } test("hint with no P or M updates") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndEngine { (path, engine) => for (_ <- 0 to 14) { appendCommit(path) } - val table = Table.forPath(tc, path) + val table = Table.forPath(engine, path) - table.getLatestSnapshot(tc).getSchema(tc) + table.getLatestSnapshot(engine).getSchema(engine) // A hint is now saved at v14 // Case: only one version change appendCommit(path) // v15 - loadPandMCheckMetrics(tc, table, Seq(15), Nil) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Seq(15), + expParquetVersionsRead = Nil) // A hint is now saved at v15 // Case: several version changes for (_ <- 16 to 19) { appendCommit(path) } - loadPandMCheckMetrics(tc, table, 19L to 16L by -1L, Nil) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = 19L to 16L by -1L, + expParquetVersionsRead = Nil) // A hint is now saved at v19 // Case: [delta-io/delta#2262] [Fix me!] Read the entire checkpoint at v20, even if v20.json // and v19 hint are available appendCommit(path) // v20 - loadPandMCheckMetrics(tc, table, Nil, Seq(20)) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Nil, + expParquetVersionsRead = Seq(20)) } } test("hint with a P or M update") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndEngine { (path, engine) => for (_ <- 0 to 3) { appendCommit(path) } - val table = Table.forPath(tc, path) + val table = Table.forPath(engine, path) - table.getLatestSnapshot(tc).getSchema(tc) + table.getLatestSnapshot(engine).getSchema(engine) // A hint is now saved at v3 @@ -257,8 +282,11 @@ class LogReplayMetricsSuite extends QueryTest .mode("append") .save(path) - loadPandMCheckMetrics(tc, table, Seq(4), Nil) - + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Seq(4), + expParquetVersionsRead = Nil) // a hint is now saved at v4 // v5 changes the protocol (which also updates the metadata) @@ -270,20 +298,22 @@ class LogReplayMetricsSuite extends QueryTest |) |""".stripMargin) - loadPandMCheckMetrics(tc, table, Seq(5), Nil) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Seq(5), + expParquetVersionsRead = Nil) } } test("read a table with multi-part checkpoint") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndEngine { (path, engine) => for (_ <- 0 to 14) { appendCommit(path) } // there should be one checkpoint file at version 10 loadScanFilesCheckMetrics( - tc, - Table.forPath(tc, path), + engine, + Table.forPath(engine, path), expJsonVersionsRead = 14L to 11L by -1L, expParquetVersionsRead = Seq(10), // we read the checkpoint twice: once for the P &M and once for the scan files @@ -293,25 +323,23 @@ class LogReplayMetricsSuite extends QueryTest checkpoint(path, actionsPerFile = 2) // Reset metrics. - tc.resetMetrics() + engine.resetMetrics() // expect the Parquet read set to contain one request with size of 15 loadScanFilesCheckMetrics( - tc, - Table.forPath(tc, path), + engine, + Table.forPath(engine, path), expJsonVersionsRead = Nil, expParquetVersionsRead = Seq(14), // we read the checkpoint twice: once for the P &M and once for the scan files - expParquetReadSetSizes = Seq(15, 15)) + expParquetReadSetSizes = Seq(8, 8)) } } Seq(true, false).foreach { deleteLastCheckpointMetadataFile => test("ensure `_last_checkpoint` is tried to read only once when " + s"""${if (deleteLastCheckpointMetadataFile) "not exists" else "valid file exists"}""") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndEngine { (path, tc) => for (_ <- 0 to 14) { appendCommit(path) } if (deleteLastCheckpointMetadataFile) { @@ -333,6 +361,110 @@ class LogReplayMetricsSuite extends QueryTest } } } + + ///////////////////////////////////////////////////////////////////////////////////////////////// + // Tests for loading P & M through checksums files // + ///////////////////////////////////////////////////////////////////////////////////////////////// + + Seq(-1L, 3L, 4L).foreach { version => // -1 means latest version + test(s"checksum found at the read version: ${if (version == 1) "latest" else version}") { + withTempDirAndEngine { (_, engine) => + val goldenTable = getTestResourceFilePath("stream_table_optimize") + val table = Table.forPath(engine, goldenTable) + + loadPandMCheckMetrics( + version match { + case -1 => table.getLatestSnapshot(engine).getSchema(engine) + case ver => table.getSnapshotAsOfVersion(engine, ver).getSchema(engine) + }, + engine, + // shouldn't need to read commit or checkpoint files as P&M are found through checksum + expJsonVersionsRead = Nil, + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + expChecksumReadSet = Seq(if (version == -1) 6 else version)) + } + } + } + + test("checksum not found at the read version, but found at a previous version") { + withTempDirAndEngine { (path, engine) => + // copy the golden table with crc files to the temp dir delete the checksum files + // for version 5 and 6 + copyTable(getTestResourceFilePath("stream_table_optimize"), path) + Seq(5L, 6L).foreach { version => + val crcFile = new File(path, f"_delta_log/$version%020d.crc") + assert(Files.deleteIfExists(crcFile.toPath)) + } + + loadPandMCheckMetrics( + Table.forPath(engine, path) + .getLatestSnapshot(engine).getSchema(engine), + engine, + // We find the checksum from crc at version 4, but still read commit files 5 and 6 + // to find the P&M which could have been updated in version 5 and 6. + expJsonVersionsRead = Seq(6, 5), + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + // First attempted to read checksum for version 6, then we do a listing of + // last 100 crc files and read the latest one which is version 4 (as version 5 is deleted) + expChecksumReadSet = Seq(6, 4)) + + + // now try to load version 3 and it should get P&M from checksum files only + loadPandMCheckMetrics( + Table.forPath(engine, path) + .getSnapshotAsOfVersion(engine, 3 /* versionId */).getSchema(engine), + engine, + // We find the checksum from crc at version 3, so shouldn't read anything else + expJsonVersionsRead = Nil, + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + expChecksumReadSet = Seq(3)) + } + } + + test("checksum not found at the read version, but uses snapshot hint lower bound") { + withTempDirAndEngine { (path, engine) => + // copy the golden table with crc files to the temp dir delete the checksum files + // for version 3 to 6 + copyTable(getTestResourceFilePath("stream_table_optimize"), path) + (3 to 6).foreach { version => + val crcFile = new File(path, f"_delta_log/$version%020d.crc") + assert(Files.deleteIfExists(crcFile.toPath)) + } + + val table = Table.forPath(engine, path) + + loadPandMCheckMetrics( + table.getSnapshotAsOfVersion(engine, 4 /* versionId */).getSchema(engine), + engine, + // There are no checksum files for versions 4. Latest is at version 2. + // We need to read the commit files 3 and 4 to get the P&M in addition the P&M from + // checksum file at version 2 + expJsonVersionsRead = Seq(4, 3), + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + // First attempted to read checksum for version 4 which doesn't exists, + // then we do a listing of last 100 crc files and read the latest + // one which is version 2 (as version 3-6 are deleted) + expChecksumReadSet = Seq(4, 2)) + // read version 4 which sets the snapshot P&M hint to 4 + + // now try to load version 6 and we expect no checksums are read + loadPandMCheckMetrics( + table.getSnapshotAsOfVersion(engine, 6 /* versionId */).getSchema(engine), + engine, + // We have snapshot P&M hint at version 4, and no checksum after 2 + expJsonVersionsRead = Seq(6, 5), + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + // First we attempt to read at version 6, then we do a listing of last 100 crc files + // bound by the snapshot hint which is at version 4 and we don't try to read checksums + // beyond version 4 + expChecksumReadSet = Seq(6)) + } + } } //////////////////// @@ -367,6 +499,8 @@ trait FileReadMetrics { self: Object => // number of times read is requested on `_last_checkpoint` private var lastCheckpointMetadataReadCalls = 0 + val checksumsRead = new ArrayBuffer[Long]() // versions of checksum files read + private val versionsRead = ArrayBuffer[Long]() // Number of checkpoint files requested read in each readParquetFiles call @@ -383,6 +517,8 @@ trait FileReadMetrics { self: Object => } } else if (Checkpointer.LAST_CHECKPOINT_FILE_NAME.equals(path.getName)) { lastCheckpointMetadataReadCalls += 1 + } else if (FileNames.isChecksumFile(path)) { + checksumsRead += FileNames.getFileVersion(path) } } @@ -394,6 +530,7 @@ trait FileReadMetrics { self: Object => lastCheckpointMetadataReadCalls = 0 versionsRead.clear() checkpointReadRequestSizes.clear() + checksumsRead.clear() } def collectReadFiles(fileIter: CloseableIterator[FileStatus]): CloseableIterator[FileStatus] = { diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala index a97aee85a70..975a944cde4 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala @@ -698,4 +698,8 @@ trait TestUtils extends Assertions with SQLHelper { } resource.getFile } + + def copyTable(sourcePath: String, targetPath: String): Unit = { + FileUtils.copyDirectory(new File(sourcePath), new File(targetPath)) + } }