diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/KernelEngineException.java b/kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/KernelEngineException.java new file mode 100644 index 00000000000..98af91bb1e2 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/KernelEngineException.java @@ -0,0 +1,30 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.exceptions; + +import io.delta.kernel.engine.Engine; + +/** + * Throws when the {@link Engine} encountered an error while executing an operation. + */ +public class KernelEngineException extends RuntimeException { + private static final String msgTemplate = "Encountered an error from the underlying engine " + + "implementation while trying to %s: %s"; + + public KernelEngineException(String attemptedOperation, Throwable cause) { + super(String.format(msgTemplate, attemptedOperation, cause.getMessage()), cause); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java index 7fcc648076c..79b3dd12d75 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java @@ -15,8 +15,7 @@ */ package io.delta.kernel.internal.checkpoints; -import java.io.FileNotFoundException; -import java.io.IOException; +import java.io.*; import java.util.*; import java.util.stream.Collectors; @@ -26,6 +25,7 @@ import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; @@ -239,22 +239,39 @@ private Optional loadMetadataFromFile(Engine engine, int tri "Retrying after 1sec. (current attempt = {})", lastCheckpointFilePath, tries); - Thread.sleep(1000); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Optional.empty(); + } return loadMetadataFromFile(engine, tries + 1); } - } catch (FileNotFoundException ex) { - return Optional.empty(); // there is no point retrying - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); + } catch (IOException ex) { + // TODO: this exception is thrown when `readJsonFiles` is called and not before + // the file is actually read. In current implementation this never happens as the file + // is actually read when the data is fetched from the returned `CloseableIterator` + // from `readJsonFiles`. Once we wrap all calls to engine and throw + // {@link KernelEngineException} instead of {@link IOException}, we can remove this + // catch block. return Optional.empty(); - } catch (Exception ex) { - String msg = String.format( - "Failed to load checkpoint metadata from file %s. " + - "Retrying after 1sec. (current attempt = %s)", - lastCheckpointFilePath, tries); - logger.warn(msg, ex); - // we can retry until max tries are exhausted - return loadMetadataFromFile(engine, tries + 1); + } catch (KernelEngineException ex) { + Throwable cause = ex.getCause(); + if (cause instanceof FileNotFoundException) { + return Optional.empty(); // there is no point retrying + } else if (cause instanceof Exception) { + String msg = String.format( + "Failed to load checkpoint metadata from file %s. " + + "It must be in the process of being written. " + + "Retrying after 1sec. (current attempt of %s (max 3)", + lastCheckpointFilePath, tries); + logger.warn(msg, cause); + // we can retry until max tries are exhausted. It saves latency as the alternative + // is to list files and find the last checkpoint file. And the `_last_checkpoint` + // file is possibly being written to. + return loadMetadataFromFile(engine, tries + 1); + } + throw ex; } } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index c4876008341..1e93411cc26 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -260,7 +260,7 @@ private Optional> listFromOrNone( } catch (FileNotFoundException e) { return Optional.empty(); } catch (IOException io) { - throw new RuntimeException("Failed to list the files in delta log", io); + throw new UncheckedIOException("Failed to list the files in delta log", io); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java index e0414cab0e1..2e3ba8b5afd 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java @@ -23,6 +23,9 @@ import java.util.function.Function; import io.delta.kernel.annotation.Evolving; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelEngineException; +import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.internal.util.Utils; @@ -34,6 +37,41 @@ */ @Evolving public interface CloseableIterator extends Iterator, Closeable { + + /** + * Returns true if the iteration has more elements. (In other words, returns true if next would + * return an element rather than throwing an exception.) + * + * @return true if the iteration has more elements + * @throws KernelEngineException For any underlying exception occurs in {@link Engine} while + * trying to execute the operation. The original exception is (if + * any) wrapped in this exception as cause. E.g. + * {@link IOException} thrown while trying to read from a Delta + * log file. It will be wrapped in this exception as cause. + * @throws KernelException When encountered an operation or state that is invalid or + * unsupported. + */ + @Override + boolean hasNext(); + + /** + * Returns the next element in the iteration. + * + * @return the next element in the iteration + * @throws NoSuchElementException if the iteration has no more elements + * @throws KernelEngineException For any underlying exception occurs in {@link Engine} while + * trying to execute the operation. The original exception is (if + * any) wrapped in this exception as cause. E.g. + * {@link IOException} thrown while trying to read from a Delta + * log file. It will be wrapped in this exception as cause. + * @throws KernelException When encountered an operation or state that is invalid or + * unsupported in Kernel. For example, trying to read from a + * Delta table that has advanced features which are not yet + * supported by Kernel. + */ + @Override + T next(); + default CloseableIterator map(Function mapper) { CloseableIterator delegate = this; return new CloseableIterator() { diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checkpoints/CheckpointerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checkpoints/CheckpointerSuite.scala index d4f9bd51c5c..38c8e56c0cb 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checkpoints/CheckpointerSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checkpoints/CheckpointerSuite.scala @@ -16,18 +16,20 @@ package io.delta.kernel.internal.checkpoints import io.delta.kernel.data.{ColumnVector, ColumnarBatch} +import io.delta.kernel.exceptions.KernelEngineException import io.delta.kernel.expressions.Predicate import io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBeforeHelper import io.delta.kernel.internal.fs.Path import io.delta.kernel.internal.util.FileNames.checkpointFileSingular import io.delta.kernel.internal.util.Utils -import io.delta.kernel.test.{BaseMockJsonHandler, MockFileSystemClientUtils, MockEngineUtils, VectorTestUtils} +import io.delta.kernel.test.{BaseMockJsonHandler, MockFileSystemClientUtils, VectorTestUtils} import io.delta.kernel.types.StructType import io.delta.kernel.utils.{CloseableIterator, FileStatus} import org.scalatest.funsuite.AnyFunSuite import java.io.{FileNotFoundException, IOException} import java.util.Optional +import scala.util.control.NonFatal class CheckpointerSuite extends AnyFunSuite with MockFileSystemClientUtils { import CheckpointerSuite._ @@ -264,20 +266,25 @@ class MockLastCheckpointMetadataFileReader(maxFailures: Int) extends BaseMockJso val file = fileIter.next() val path = new Path(file.getPath) - if (currentFailCount < maxFailures) { - currentFailCount += 1 - throw new IOException("Retryable exception") - } - Utils.singletonCloseableIterator( - path.getParent match { - case VALID_LAST_CHECKPOINT_FILE_TABLE => SAMPLE_LAST_CHECKPOINT_FILE_CONTENT - case ZERO_SIZED_LAST_CHECKPOINT_FILE_TABLE => ZERO_ENTRIES_COLUMNAR_BATCH - case INVALID_LAST_CHECKPOINT_FILE_TABLE => - throw new IOException("Invalid last checkpoint file") - case LAST_CHECKPOINT_FILE_NOT_FOUND_TABLE => - throw new FileNotFoundException("File not found") - case _ => throw new IOException("Unknown table") - }) + try { + if (currentFailCount < maxFailures) { + currentFailCount += 1 + throw new IOException("Retryable exception") + } + + path.getParent match { + case VALID_LAST_CHECKPOINT_FILE_TABLE => SAMPLE_LAST_CHECKPOINT_FILE_CONTENT + case ZERO_SIZED_LAST_CHECKPOINT_FILE_TABLE => ZERO_ENTRIES_COLUMNAR_BATCH + case INVALID_LAST_CHECKPOINT_FILE_TABLE => + throw new IOException("Invalid last checkpoint file") + case LAST_CHECKPOINT_FILE_NOT_FOUND_TABLE => + throw new FileNotFoundException("File not found") + case _ => throw new IOException("Unknown table") + } + } catch { + case NonFatal(e) => throw new KernelEngineException("Failed to read last checkpoint", e); + } + ) } } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultJsonHandler.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultJsonHandler.java index f8702f0ecb9..7c89fbad6e6 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultJsonHandler.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultJsonHandler.java @@ -19,6 +19,7 @@ import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.*; +import static java.lang.String.format; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.*; @@ -29,6 +30,7 @@ import io.delta.kernel.data.*; import io.delta.kernel.engine.JsonHandler; +import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.types.*; import io.delta.kernel.utils.CloseableIterator; @@ -89,7 +91,7 @@ public StructType deserializeStructType(String structTypeJson) { return DataTypeParser.parseSchema(defaultObjectReader.readTree(structTypeJson)); } catch (JsonProcessingException ex) { throw new RuntimeException( - String.format("Could not parse JSON: %s", structTypeJson), ex); + format("Could not parse JSON: %s", structTypeJson), ex); } } @@ -127,7 +129,8 @@ public boolean hasNext() { } } } catch (IOException ex) { - throw new RuntimeException(ex); + throw new KernelEngineException( + format("Error reading JSON file: %s", currentFile.getPath()), ex); } return nextLine != null; @@ -215,7 +218,7 @@ private Row parseJson(String json, StructType readSchema) { final JsonNode jsonNode = objectReaderReadBigDecimals.readTree(json); return new DefaultJsonRow((ObjectNode) jsonNode, readSchema); } catch (JsonProcessingException ex) { - throw new RuntimeException(String.format("Could not parse JSON: %s", json), ex); + throw new KernelEngineException(format("Could not parse JSON: %s", json), ex); } } } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java index 250586bf436..46e18cb95a3 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java @@ -16,7 +16,6 @@ package io.delta.kernel.defaults.internal.parquet; import java.io.IOException; -import java.io.UncheckedIOException; import java.net.URI; import java.util.*; import static java.util.Objects.requireNonNull; @@ -34,6 +33,7 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.*; import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; @@ -85,8 +85,8 @@ public boolean hasNext() { hasNotConsumedNextElement = reader.nextKeyValue() && reader.getCurrentValue() != null; return hasNotConsumedNextElement; - } catch (IOException | InterruptedException ie) { - throw new RuntimeException(ie); + } catch (IOException | InterruptedException ex) { + throw new KernelEngineException("Error reading Parquet file: " + path, ex); } } @@ -148,7 +148,7 @@ private void initParquetReaderIfRequired() { reader.initialize(fileReader, confCopy); } catch (IOException e) { Utils.closeCloseablesSilently(fileReader, reader); - throw new UncheckedIOException(e); + throw new KernelEngineException("Error reading Parquet file: " + path, e); } } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala index 39ca5de76c7..bbddfab5dae 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala @@ -18,10 +18,11 @@ package io.delta.kernel.defaults import java.io.File import io.delta.kernel.Table -import io.delta.kernel.engine.{ExpressionHandler, FileSystemClient, Engine} +import io.delta.kernel.engine.{Engine, ExpressionHandler, FileSystemClient} import io.delta.kernel.data.ColumnarBatch -import io.delta.kernel.defaults.engine.{DefaultJsonHandler, DefaultParquetHandler, DefaultEngine} +import io.delta.kernel.defaults.engine.{DefaultEngine, DefaultJsonHandler, DefaultParquetHandler} import io.delta.kernel.expressions.Predicate +import io.delta.kernel.internal.checkpoints.Checkpointer import io.delta.kernel.internal.fs.Path import io.delta.kernel.internal.util.FileNames import io.delta.kernel.internal.util.Utils.toCloseableIterator @@ -35,6 +36,7 @@ import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSparkSession +import java.nio.file.Files import java.util.Optional import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ArrayBuffer @@ -79,7 +81,8 @@ class LogReplayMetricsSuite extends QueryTest table: Table, expJsonVersionsRead: Seq[Long], expParquetVersionsRead: Seq[Long], - expParquetReadSetSizes: Seq[Long]): Unit = { + expParquetReadSetSizes: Seq[Long], + expLastCheckpointReadCalls: Option[Int] = None): Unit = { engine.resetMetrics() val scan = table.getLatestSnapshot(engine).getScanBuilder(engine).build() // get all scan files and iterate through them to trigger the metrics collection @@ -90,14 +93,16 @@ class LogReplayMetricsSuite extends QueryTest engine, expJsonVersionsRead, expParquetVersionsRead, - expParquetReadSetSizes) + expParquetReadSetSizes, + expLastCheckpointReadCalls) } def assertMetrics( engine: MetricsEngine, expJsonVersionsRead: Seq[Long], expParquetVersionsRead: Seq[Long], - expParquetReadSetSizes: Seq[Long]): Unit = { + expParquetReadSetSizes: Seq[Long], + expLastCheckpointReadCalls: Option[Int] = None): Unit = { val actualJsonVersionsRead = engine.getJsonHandler.getVersionsRead val actualParquetVersionsRead = engine.getParquetHandler.getVersionsRead @@ -117,6 +122,12 @@ class LogReplayMetricsSuite extends QueryTest s"$expParquetReadSetSizes but read $actualParquetReadSetSizes" ) } + + expLastCheckpointReadCalls.foreach { expCalls => + val actualCalls = engine.getJsonHandler.getLastCheckpointMetadataReadCalls + assert(actualCalls === expCalls, + s"Expected to read last checkpoint metadata $expCalls times but read $actualCalls times") + } } private def appendCommit(path: String): Unit = @@ -294,6 +305,34 @@ class LogReplayMetricsSuite extends QueryTest expParquetReadSetSizes = Seq(15, 15)) } } + + Seq(true, false).foreach { deleteLastCheckpointMetadataFile => + test("ensure `_last_checkpoint` is tried to read only once when " + + s"""${if (deleteLastCheckpointMetadataFile) "not exists" else "valid file exists"}""") { + withTempDirAndEngine { (dir, tc) => + val path = dir.getAbsolutePath + + for (_ <- 0 to 14) { appendCommit(path) } + + if (deleteLastCheckpointMetadataFile) { + assert(Files.deleteIfExists(new File(path, "_delta_log/_last_checkpoint").toPath)) + } + + // there should be one checkpoint file at version 10 + loadScanFilesCheckMetrics( + tc, + Table.forPath(tc, path), + expJsonVersionsRead = 14L to 11L by -1L, + expParquetVersionsRead = Seq(10), + // we read the checkpoint twice: once for the P &M and once for the scan files + expParquetReadSetSizes = Seq(1, 1), + // We try to read `_last_checkpoint` once. If it doesn't exist, we don't try reading + // again. If it exists, we succeed reading in the first time + expLastCheckpointReadCalls = Some(1) + ) + } + } + } } //////////////////// @@ -325,6 +364,9 @@ class MetricsEngine(config: Configuration) extends Engine { * 10.checkpoint.parquet) read */ trait FileReadMetrics { self: Object => + // number of times read is requested on `_last_checkpoint` + private var lastCheckpointMetadataReadCalls = 0 + private val versionsRead = ArrayBuffer[Long]() // Number of checkpoint files requested read in each readParquetFiles call @@ -339,12 +381,17 @@ trait FileReadMetrics { self: Object => if (!versionsRead.contains(version)) { versionsRead += version } + } else if (Checkpointer.LAST_CHECKPOINT_FILE_NAME.equals(path.getName)) { + lastCheckpointMetadataReadCalls += 1 } } def getVersionsRead: Seq[Long] = versionsRead + def getLastCheckpointMetadataReadCalls: Int = lastCheckpointMetadataReadCalls + def resetMetrics(): Unit = { + lastCheckpointMetadataReadCalls = 0 versionsRead.clear() checkpointReadRequestSizes.clear() }