Skip to content

Commit

Permalink
[3.2][Kernel] Handle KernelEngineException when reading the `_last_…
Browse files Browse the repository at this point in the history
…checkpoint` file (#3086)

There is an issue with the `CloseableIterator` interface that Kernel is
using. Currently, it extends Java's `iterator`, which doesn't throw any
exceptions. We use `CloseableIterator` when returning data read from a
file or any incremental data access. Any `IOException` in `hasNext` or
`next` is wrapped in a `UncheckedIOException` or `RuntimeException`.
Users of the `CloseableIterator` need to catch for
`UncheckedIOException` or `RuntimeException` explicitly and look at the
cause if they are interested in the `IOException`. This is not
consistent and causes problems for the code that want to handle
exceptions like `FileNotFoundException` (subclass of `IOException`) and
take further actions.

* Change the `CloseableIterator.{next, hasNext}` contract to expect
`KernelEngineException` for any exceptions that occur while executing in
the `Engine`.
* Update the `DefaultParquetHandler` and `DefaultJsonHandler` to throw
`KernelEngineException` instead of `UncheckedIOException` or
`RuntimeException`.
* In the checkpoint metadata loading method, catch
`KernelEngineException` and see if the cause is `FileNotFoundException.`
If yes, don't retry loading.
  • Loading branch information
vkorukanti committed May 16, 2024
1 parent a7e65c9 commit 2c444a2
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.exceptions;

import io.delta.kernel.engine.Engine;

/**
* Throws when the {@link Engine} encountered an error while executing an operation.
*/
public class KernelEngineException extends RuntimeException {
private static final String msgTemplate = "Encountered an error from the underlying engine " +
"implementation while trying to %s: %s";

public KernelEngineException(String attemptedOperation, Throwable cause) {
super(String.format(msgTemplate, attemptedOperation, cause.getMessage()), cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
*/
package io.delta.kernel.internal.checkpoints;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.*;
import java.util.*;
import java.util.stream.Collectors;

Expand All @@ -26,6 +25,7 @@
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;

Expand Down Expand Up @@ -239,22 +239,39 @@ private Optional<CheckpointMetaData> loadMetadataFromFile(Engine engine, int tri
"Retrying after 1sec. (current attempt = {})",
lastCheckpointFilePath,
tries);
Thread.sleep(1000);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Optional.empty();
}
return loadMetadataFromFile(engine, tries + 1);
}
} catch (FileNotFoundException ex) {
return Optional.empty(); // there is no point retrying
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (IOException ex) {
// TODO: this exception is thrown when `readJsonFiles` is called and not before
// the file is actually read. In current implementation this never happens as the file
// is actually read when the data is fetched from the returned `CloseableIterator`
// from `readJsonFiles`. Once we wrap all calls to engine and throw
// {@link KernelEngineException} instead of {@link IOException}, we can remove this
// catch block.
return Optional.empty();
} catch (Exception ex) {
String msg = String.format(
"Failed to load checkpoint metadata from file %s. " +
"Retrying after 1sec. (current attempt = %s)",
lastCheckpointFilePath, tries);
logger.warn(msg, ex);
// we can retry until max tries are exhausted
return loadMetadataFromFile(engine, tries + 1);
} catch (KernelEngineException ex) {
Throwable cause = ex.getCause();
if (cause instanceof FileNotFoundException) {
return Optional.empty(); // there is no point retrying
} else if (cause instanceof Exception) {
String msg = String.format(
"Failed to load checkpoint metadata from file %s. " +
"It must be in the process of being written. " +
"Retrying after 1sec. (current attempt of %s (max 3)",
lastCheckpointFilePath, tries);
logger.warn(msg, cause);
// we can retry until max tries are exhausted. It saves latency as the alternative
// is to list files and find the last checkpoint file. And the `_last_checkpoint`
// file is possibly being written to.
return loadMetadataFromFile(engine, tries + 1);
}
throw ex;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private Optional<CloseableIterator<FileStatus>> listFromOrNone(
} catch (FileNotFoundException e) {
return Optional.empty();
} catch (IOException io) {
throw new RuntimeException("Failed to list the files in delta log", io);
throw new UncheckedIOException("Failed to list the files in delta log", io);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.function.Function;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.exceptions.KernelException;

import io.delta.kernel.internal.util.Utils;

Expand All @@ -34,6 +37,41 @@
*/
@Evolving
public interface CloseableIterator<T> extends Iterator<T>, Closeable {

/**
* Returns true if the iteration has more elements. (In other words, returns true if next would
* return an element rather than throwing an exception.)
*
* @return true if the iteration has more elements
* @throws KernelEngineException For any underlying exception occurs in {@link Engine} while
* trying to execute the operation. The original exception is (if
* any) wrapped in this exception as cause. E.g.
* {@link IOException} thrown while trying to read from a Delta
* log file. It will be wrapped in this exception as cause.
* @throws KernelException When encountered an operation or state that is invalid or
* unsupported.
*/
@Override
boolean hasNext();

/**
* Returns the next element in the iteration.
*
* @return the next element in the iteration
* @throws NoSuchElementException if the iteration has no more elements
* @throws KernelEngineException For any underlying exception occurs in {@link Engine} while
* trying to execute the operation. The original exception is (if
* any) wrapped in this exception as cause. E.g.
* {@link IOException} thrown while trying to read from a Delta
* log file. It will be wrapped in this exception as cause.
* @throws KernelException When encountered an operation or state that is invalid or
* unsupported in Kernel. For example, trying to read from a
* Delta table that has advanced features which are not yet
* supported by Kernel.
*/
@Override
T next();

default <U> CloseableIterator<U> map(Function<T, U> mapper) {
CloseableIterator<T> delegate = this;
return new CloseableIterator<U>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
package io.delta.kernel.internal.checkpoints

import io.delta.kernel.data.{ColumnVector, ColumnarBatch}
import io.delta.kernel.exceptions.KernelEngineException
import io.delta.kernel.expressions.Predicate
import io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBeforeHelper
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.util.FileNames.checkpointFileSingular
import io.delta.kernel.internal.util.Utils
import io.delta.kernel.test.{BaseMockJsonHandler, MockFileSystemClientUtils, MockEngineUtils, VectorTestUtils}
import io.delta.kernel.test.{BaseMockJsonHandler, MockFileSystemClientUtils, VectorTestUtils}
import io.delta.kernel.types.StructType
import io.delta.kernel.utils.{CloseableIterator, FileStatus}
import org.scalatest.funsuite.AnyFunSuite

import java.io.{FileNotFoundException, IOException}
import java.util.Optional
import scala.util.control.NonFatal

class CheckpointerSuite extends AnyFunSuite with MockFileSystemClientUtils {
import CheckpointerSuite._
Expand Down Expand Up @@ -264,20 +266,25 @@ class MockLastCheckpointMetadataFileReader(maxFailures: Int) extends BaseMockJso
val file = fileIter.next()
val path = new Path(file.getPath)

if (currentFailCount < maxFailures) {
currentFailCount += 1
throw new IOException("Retryable exception")
}

Utils.singletonCloseableIterator(
path.getParent match {
case VALID_LAST_CHECKPOINT_FILE_TABLE => SAMPLE_LAST_CHECKPOINT_FILE_CONTENT
case ZERO_SIZED_LAST_CHECKPOINT_FILE_TABLE => ZERO_ENTRIES_COLUMNAR_BATCH
case INVALID_LAST_CHECKPOINT_FILE_TABLE =>
throw new IOException("Invalid last checkpoint file")
case LAST_CHECKPOINT_FILE_NOT_FOUND_TABLE =>
throw new FileNotFoundException("File not found")
case _ => throw new IOException("Unknown table")
})
try {
if (currentFailCount < maxFailures) {
currentFailCount += 1
throw new IOException("Retryable exception")
}

path.getParent match {
case VALID_LAST_CHECKPOINT_FILE_TABLE => SAMPLE_LAST_CHECKPOINT_FILE_CONTENT
case ZERO_SIZED_LAST_CHECKPOINT_FILE_TABLE => ZERO_ENTRIES_COLUMNAR_BATCH
case INVALID_LAST_CHECKPOINT_FILE_TABLE =>
throw new IOException("Invalid last checkpoint file")
case LAST_CHECKPOINT_FILE_NOT_FOUND_TABLE =>
throw new FileNotFoundException("File not found")
case _ => throw new IOException("Unknown table")
}
} catch {
case NonFatal(e) => throw new KernelEngineException("Failed to read last checkpoint", e);
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.*;
import static java.lang.String.format;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;
Expand All @@ -29,6 +30,7 @@

import io.delta.kernel.data.*;
import io.delta.kernel.engine.JsonHandler;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.CloseableIterator;
Expand Down Expand Up @@ -89,7 +91,7 @@ public StructType deserializeStructType(String structTypeJson) {
return DataTypeParser.parseSchema(defaultObjectReader.readTree(structTypeJson));
} catch (JsonProcessingException ex) {
throw new RuntimeException(
String.format("Could not parse JSON: %s", structTypeJson), ex);
format("Could not parse JSON: %s", structTypeJson), ex);
}
}

Expand Down Expand Up @@ -127,7 +129,8 @@ public boolean hasNext() {
}
}
} catch (IOException ex) {
throw new RuntimeException(ex);
throw new KernelEngineException(
format("Error reading JSON file: %s", currentFile.getPath()), ex);
}

return nextLine != null;
Expand Down Expand Up @@ -215,7 +218,7 @@ private Row parseJson(String json, StructType readSchema) {
final JsonNode jsonNode = objectReaderReadBigDecimals.readTree(json);
return new DefaultJsonRow((ObjectNode) jsonNode, readSchema);
} catch (JsonProcessingException ex) {
throw new RuntimeException(String.format("Could not parse JSON: %s", json), ex);
throw new KernelEngineException(format("Could not parse JSON: %s", json), ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.delta.kernel.defaults.internal.parquet;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.*;
import static java.util.Objects.requireNonNull;
Expand All @@ -34,6 +33,7 @@
import static org.apache.parquet.hadoop.ParquetInputFormat.*;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
Expand Down Expand Up @@ -85,8 +85,8 @@ public boolean hasNext() {
hasNotConsumedNextElement = reader.nextKeyValue() &&
reader.getCurrentValue() != null;
return hasNotConsumedNextElement;
} catch (IOException | InterruptedException ie) {
throw new RuntimeException(ie);
} catch (IOException | InterruptedException ex) {
throw new KernelEngineException("Error reading Parquet file: " + path, ex);
}
}

Expand Down Expand Up @@ -148,7 +148,7 @@ private void initParquetReaderIfRequired() {
reader.initialize(fileReader, confCopy);
} catch (IOException e) {
Utils.closeCloseablesSilently(fileReader, reader);
throw new UncheckedIOException(e);
throw new KernelEngineException("Error reading Parquet file: " + path, e);
}
}
}
Expand Down
Loading

0 comments on commit 2c444a2

Please sign in to comment.