Skip to content

Commit

Permalink
[Kernel] Support for loading protocol and metadata from checksum file…
Browse files Browse the repository at this point in the history
… in DeltaLog

Initial read current version CRC w test to verify

wip

end-2-end working + tests

Co-authored-by:  Allison Portis <[email protected]>
Co-authored-by: Venki Korukanti <[email protected]>
  • Loading branch information
vkorukanti and allisonport-db committed May 16, 2024
1 parent 2c444a2 commit 069a1f9
Show file tree
Hide file tree
Showing 32 changed files with 519 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;

import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;

import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import static io.delta.kernel.internal.replay.VersionStats.fromColumnarBatch;
import static io.delta.kernel.internal.util.FileNames.checksumFile;
import static io.delta.kernel.internal.util.FileNames.isChecksumFile;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;

/**
* Utility method to load protocol and metadata from the Delta log checksum files.
*/
public class ChecksumReader {
private static final Logger logger = LoggerFactory.getLogger(ChecksumReader.class);

/**
* Load the protocol and metadata from the checksum file at the given version. If the checksum
* file is not found at the given version, it will try to find the latest checksum file that is
* created after the lower bound version or within the last 100 versions.
*
* @param engine the engine to use for reading the checksum file
* @param logPath the path to the Delta log
* @param readVersion the version to read the checksum file from
* @param lowerBoundOpt the lower bound version to search for the checksum file
* @return Optional {@link VersionStats} containing the protocol and metadata, and the version
* of the checksum file. If the checksum file is not found, it will return an empty
*/
public static Optional<VersionStats> getVersionStats(
Engine engine,
Path logPath,
long readVersion,
Optional<Long> lowerBoundOpt) {

// First try to load the CRC at given version. If not found or failed to read then try to
// find the latest CRC file that is created after the lower bound version or within the
// last 100 versions.
Path crcFilePath = checksumFile(logPath, readVersion);
Optional<VersionStats> versionStatsOpt = readChecksumFile(engine, crcFilePath);
if (versionStatsOpt.isPresent() ||
// we don't expect any more checksum files as it is the first version
readVersion == 0) {
return versionStatsOpt;
}

// Try to list the last 100 CRC files and see if we can find a CRC that we can use
long lowerBound = Math.max(
lowerBoundOpt.orElse(0L) + 1,
Math.max(0, readVersion - 100));

Path listFrom = checksumFile(logPath, lowerBound);
try (CloseableIterator<FileStatus> crcFiles =
engine.getFileSystemClient().listFrom(listFrom.toString())) {

List<FileStatus> crcFilesList = new ArrayList<>();
crcFiles.filter(file -> isChecksumFile(new Path(file.getPath())))
.forEachRemaining(crcFilesList::add);

// pick the last file which is the latest version that has the CRC file
if (crcFilesList.isEmpty()) {
logger.warn("No checksum files found in the range {} to {}", lowerBound,
readVersion);
return Optional.empty();
}

FileStatus latestCRCFile = crcFilesList.get(crcFilesList.size() - 1);
return readChecksumFile(engine, new Path(latestCRCFile.getPath()));
} catch (Exception e) {
logger.warn("Failed to list checksum files from {}", listFrom, e);
return Optional.empty();
}

}

private static Optional<VersionStats> readChecksumFile(Engine engine, Path filePath) {
try (CloseableIterator<ColumnarBatch> iter = engine.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(FileStatus.of(filePath.toString())),
VersionStats.FULL_SCHEMA,
Optional.empty())) {
// We do this instead of iterating through the rows or using `getSingularRow` so we
// can use the existing fromColumnVector methods in Protocol, Metadata, Format etc
if (!iter.hasNext()) {
logger.warn("Checksum file is empty: {}", filePath);
return Optional.empty();
}

ColumnarBatch batch = iter.next();
if (batch.getSize() != 1) {
String msg = "Expected exactly one row in the checksum file {}, found {} rows";
logger.warn(msg, filePath, batch.getSize());
return Optional.empty();
}

long crcVersion = FileNames.checksumVersion(filePath);

VersionStats versionStats = fromColumnarBatch(engine, crcVersion, batch, 0 /* rowId */);
if (versionStats.getMetadata() == null || versionStats.getProtocol() == null) {
logger.warn("Invalid checksum file missing protocol and/or metadata: {}", filePath);
return Optional.empty();
}
return Optional.of(versionStats);
} catch (Exception e) {
// This can happen when the version does not have a checksum file
logger.warn("Failed to read checksum file {}", filePath, e);
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,39 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
);
}

// Compute the lower bound for the CRC search
// If the snapshot hint is present, we can use it as the lower bound for the CRC search.
// TODO: this can be further improved to make the lower bound until the checkpoint version
Optional<Long> crcSearchLowerBound = snapshotHint.map(SnapshotHint::getVersion);

Optional<VersionStats> versionStatsOpt =
ChecksumReader.getVersionStats(
engine,
logSegment.logPath,
snapshotVersion,
crcSearchLowerBound);

if (versionStatsOpt.isPresent()) {
// We found the protocol and metadata for the version we are looking for
VersionStats versionStats = versionStatsOpt.get();
if (versionStats.getVersion() == snapshotVersion) {
return new Tuple2<>(
versionStats.getProtocol(),
versionStats.getMetadata()
);
}

// We found the protocol and metadata in a version older than the one we are looking
// for. We need to replay the actions to get the latest protocol and metadata, but
// update the hint to read the actions from the version we found to check if the
// protocol and metadata are updated in the versions after the one we found.
snapshotHint = Optional.of(new SnapshotHint(
versionStats.getVersion(),
versionStats.getProtocol(),
versionStats.getMetadata()
));
}

Protocol protocol = null;
Metadata metadata = null;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.types.StructType;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;

public class VersionStats {

public static VersionStats fromColumnarBatch(
Engine engine, long version, ColumnarBatch batch, int rowId) {
// fromColumnVector already takes care of nulls
Protocol protocol = Protocol.fromColumnVector(
batch.getColumnVector(PROTOCOL_ORDINAL), rowId);
Metadata metadata = Metadata.fromColumnVector(
batch.getColumnVector(METADATA_ORDINAL), rowId, engine);
return new VersionStats(version, metadata, protocol);
}

// We can add additional fields later
public static final StructType FULL_SCHEMA = new StructType()
.add("protocol", Protocol.FULL_SCHEMA)
.add("metadata", Metadata.FULL_SCHEMA);

private static final int PROTOCOL_ORDINAL = 0;
private static final int METADATA_ORDINAL = 1;

private final long version;
private final Metadata metadata;
private final Protocol protocol;

protected VersionStats(long version, Metadata metadata, Protocol protocol) {
this.version = version;
this.metadata = metadata;
this.protocol = protocol;
}

/**
* The version of the Delta table that this VersionStats represents.
*/
public long getVersion() {
return version;
}

/**
* The {@link Metadata} stored in this VersionStats. May be null.
*/
public Metadata getMetadata() {
return metadata;
}

/**
* The {@link Protocol} stored in this VersionStats. May be null.
*/
public Protocol getProtocol() {
return protocol;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ private FileNames() {}
private static final Pattern MULTI_PART_CHECKPOINT_FILE_PATTERN =
Pattern.compile("(\\d+)\\.checkpoint\\.\\d+\\.\\d+\\.parquet");

private static final Pattern checksumFileRegex = Pattern.compile("(\\d+)\\.crc");

public static final String SIDECAR_DIRECTORY = "_sidecars";

/**
Expand Down Expand Up @@ -81,6 +83,17 @@ public static String sidecarFile(Path path, String sidecar) {
return String.format("%s/%s/%s", path.toString(), SIDECAR_DIRECTORY, sidecar);
}

/**
* Returns the path to the checksum file for the given version.
*/
public static Path checksumFile(Path path, long version) {
return new Path(path, String.format("%020d.crc", version));
}

public static long checksumVersion(Path path) {
return Long.parseLong(path.getName().split("\\.")[0]);
}

/**
* Returns the prefix of all delta log files for the given version.
* <p>
Expand Down Expand Up @@ -159,11 +172,14 @@ public static boolean isV2CheckpointFile(String fileName) {
return V2_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
}


public static boolean isCommitFile(String fileName) {
return DELTA_FILE_PATTERN.matcher(new Path(fileName).getName()).matches();
}

public static boolean isChecksumFile(Path checksumFilePath) {
return checksumFileRegex.matcher(checksumFilePath.getName()).matches();
}

/**
* Get the version of the checkpoint, checksum or delta file. Throws an error if an unexpected
* file type is seen. These unexpected files should be filtered out to ensure forward
Expand All @@ -175,12 +191,11 @@ public static long getFileVersion(Path path) {
return checkpointVersion(path);
} else if (isCommitFile(path.getName())) {
return deltaVersion(path);
//} else if (isChecksumFile(path)) {
// checksumVersion(path);
} else if (isChecksumFile(path)) {
return checksumVersion(path);
} else {
throw new IllegalArgumentException(
String.format("Unexpected file type found in transaction log: %s", path)
);
String.format("Unexpected file type found in transaction log: %s", path));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,14 @@ public long getModificationTime() {
public static FileStatus of(String path, long size, long modificationTime) {
return new FileStatus(path, size, modificationTime);
}

/**
* Create a {@link FileStatus} with the given path with size and modification time set to 0.
*
* @param path Fully qualified file path.
* @return {@link FileStatus} object
*/
public static FileStatus of(String path) {
return new FileStatus(path, 0 /* size */, 0 /* modTime */);
}
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":0,"numFiles":0,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"eabe3042-b6e7-4710-aa7f-70ab5cfac6cb"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1664214549848,"userId":"7953272455820895","userName":"[email protected]","operation":"CREATE TABLE","operationParameters":{"isManaged":"true","description":null,"partitionBy":"[\"birthday\"]","properties":"{\"delta.enableChangeDataFeed\":\"true\"}"},"notebook":{"notebookId":"3194333061073108"},"clusterId":"0819-204509-hill72","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"eabe3042-b6e7-4710-aa7f-70ab5cfac6cb"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":4}}
{"metaData":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":791,"numFiles":1,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"332a246e-eea2-43a5-8422-b65526c6cbe9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["birthday"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1664214549691},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[791,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"f7b5a77a-f86d-40b6-b266-5da97aecbe6e"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1664214552033,"userId":"7953272455820895","userName":"[email protected]","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"3194333061073108"},"clusterId":"0819-204509-hill72","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"791"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"f7b5a77a-f86d-40b6-b266-5da97aecbe6e"}}
{"add":{"path":"birthday=2020-01-01/part-00000-21994048-f0e7-4655-a705-09e597411943.c000.snappy.parquet","partitionValues":{"birthday":"2020-01-01"},"size":791,"modificationTime":1664214552000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"1\",\"age\":1},\"maxValues\":{\"name\":\"1\",\"age\":1},\"nullCount\":{\"name\":0,\"age\":0}}","tags":{"INSERTION_TIME":"1664214552000000","MIN_INSERTION_TIME":"1664214552000000","MAX_INSERTION_TIME":"1664214552000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
Loading

0 comments on commit 069a1f9

Please sign in to comment.