diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java index 1897125c784..dc295ed943f 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java @@ -21,6 +21,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Iterator; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -128,14 +129,19 @@ public BaseExternalLogStore(Configuration hadoopConf) { public Iterator listFrom(Path path, Configuration hadoopConf) throws IOException { final FileSystem fs = path.getFileSystem(hadoopConf); final Path resolvedPath = stripUserInfo(fs.makeQualified(path)); - final Path tablePath = getTablePath(resolvedPath); - final Optional entry = getLatestExternalEntry(tablePath); - - if (entry.isPresent() && !entry.get().complete) { - // Note: `fixDeltaLog` will apply per-JVM mutual exclusion via a lock to help reduce - // the chance of many reader threads in a single JVM doing duplicate copies of - // T(N) -> N.json. - fixDeltaLog(fs, entry.get()); + + // VACUUM operations may use this LogStore::listFrom API. We don't need to attempt to + // perform a fix/recovery during such operations that are not listing the _delta_log. + if (isDeltaLogPath(resolvedPath)) { + final Path tablePath = getTablePath(resolvedPath); + final Optional entry = getLatestExternalEntry(tablePath); + + if (entry.isPresent() && !entry.get().complete) { + // Note: `fixDeltaLog` will apply per-JVM mutual exclusion via a lock to help reduce + // the chance of many reader threads in a single JVM doing duplicate copies of + // T(N) -> N.json. + fixDeltaLog(fs, entry.get()); + } } // This is predicated on the storage system providing consistent listing @@ -471,4 +477,14 @@ private Path stripUserInfo(Path path) { throw new IllegalArgumentException(e); } } + + /** Returns true if this path is contained within a _delta_log folder. */ + @VisibleForTesting + protected boolean isDeltaLogPath(Path normalizedPath) { + return Arrays.stream(normalizedPath + .toUri() + .toString() + .split(Path.SEPARATOR) + ).anyMatch("_delta_log"::equals); + } } diff --git a/storage-s3-dynamodb/src/test/java/io/delta/storage/MemoryLogStore.java b/storage-s3-dynamodb/src/test/java/io/delta/storage/MemoryLogStore.java index 7cb58f62a78..95f5ce9a3d6 100644 --- a/storage-s3-dynamodb/src/test/java/io/delta/storage/MemoryLogStore.java +++ b/storage-s3-dynamodb/src/test/java/io/delta/storage/MemoryLogStore.java @@ -29,10 +29,19 @@ * database) */ public class MemoryLogStore extends BaseExternalLogStore { + public static String IS_DELTA_LOG_PATH_OVERRIDE_KEY = + "spark.hadoop.io.delta.storage.MemoryLogStore.isDeltaLogPath.alwaysTrue"; + + public static int numGetLatestExternalEntryCalls = 0; + public MemoryLogStore(Configuration hadoopConf) { super(hadoopConf); } + /////////////////// + // API Overrides // + /////////////////// + @Override protected void putExternalEntry( ExternalCommitEntry entry, @@ -69,6 +78,8 @@ protected Optional getExternalEntry( @Override protected Optional getLatestExternalEntry(Path tablePath) { + numGetLatestExternalEntryCalls++; + final Path fixedTablePath = new Path(fixPathSchema(tablePath.toString())); return hashMap .values() @@ -77,6 +88,19 @@ protected Optional getLatestExternalEntry(Path tablePath) { .max(Comparator.comparing(ExternalCommitEntry::absoluteFilePath)); } + @Override + protected boolean isDeltaLogPath(Path normalizedPath) { + if (initHadoopConf().getBoolean(IS_DELTA_LOG_PATH_OVERRIDE_KEY, false)) { + return true; // hardcoded to return true + } else { + return super.isDeltaLogPath(normalizedPath); // only return true if in _delta_log folder + } + } + + //////////////////// + // Static Helpers // + //////////////////// + /** * ExternalLogStoreSuite sometimes uses "failing:" scheme prefix to inject errors during tests * However, we want lookups for the same $tablePath to return the same result, regardless of diff --git a/storage-s3-dynamodb/src/test/scala/io/delta/storage/ExternalLogStoreSuite.scala b/storage-s3-dynamodb/src/test/scala/io/delta/storage/ExternalLogStoreSuite.scala index 945618f355b..552de23d695 100644 --- a/storage-s3-dynamodb/src/test/scala/io/delta/storage/ExternalLogStoreSuite.scala +++ b/storage-s3-dynamodb/src/test/scala/io/delta/storage/ExternalLogStoreSuite.scala @@ -24,16 +24,27 @@ import org.apache.hadoop.fs._ import org.scalatest.funsuite.AnyFunSuite import org.apache.spark.sql.delta.FakeFileSystem +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.storage.LogStoreAdaptor import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.functions.col ///////////////////// // Base Test Suite // ///////////////////// class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSuite { + protected def shouldUseRenameToWriteCheckpoint: Boolean = false + override protected val publicLogStoreClassName: String = classOf[MemoryLogStore].getName + protected override def beforeEach(): Unit = { + super.beforeEach() + + MemoryLogStore.numGetLatestExternalEntryCalls = 0 + } + testHadoopConf( expectedErrMsg = "No FileSystem for scheme \"fake\"", "fs.fake.impl" -> classOf[FakeFileSystem].getName, @@ -48,6 +59,102 @@ class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSui FileNames.deltaFile(new Path(s"failing:${logDir.getCanonicalPath}"), version) } + test("#3423: listFrom only checks latest external store entry if listing a delta log file") { + withTempDir { tempDir => + val store = createLogStore(spark) + .asInstanceOf[LogStoreAdaptor].logStoreImpl + .asInstanceOf[MemoryLogStore] + val logDir = new File(tempDir.getCanonicalPath, "_delta_log") + logDir.mkdir() + + val deltaFilePath = getDeltaVersionPath(logDir, 0) + val dataFilePath = new Path(tempDir.getCanonicalPath, ".part-00000-da82aeb5-snappy.parquet") + + val fs = deltaFilePath.getFileSystem(sessionHadoopConf) + fs.create(deltaFilePath).close() + fs.create(dataFilePath).close() + + assert(MemoryLogStore.numGetLatestExternalEntryCalls == 0) + + store.listFrom(deltaFilePath, sessionHadoopConf) + assert(MemoryLogStore.numGetLatestExternalEntryCalls == 1) // contacted external store + + store.listFrom(dataFilePath, sessionHadoopConf) + assert(MemoryLogStore.numGetLatestExternalEntryCalls == 1) // did not contact external store + } + } + + test("#3423: VACUUM does not check external store for latest entry") { + + // previous behaviour: always check external store for latest entry when listing + // current behaviour: only check external store for latest entry when listing a delta log file + def doVacuumTestGetNumVacuumExternalStoreCalls(usePreviousListBehavior: Boolean): Int = { + var ret = -1 + + withTempDir { tempDir => + withSQLConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false") { + spark.conf.set( + MemoryLogStore.IS_DELTA_LOG_PATH_OVERRIDE_KEY, + usePreviousListBehavior) + + val path = tempDir.getCanonicalPath + + spark.range(100) + .withColumn("part", col("id") % 10) + .write + .format("delta") + .partitionBy("part") + .save(path) + + spark.sql(s"DELETE FROM delta.`$path`") + + val numExternalCallsBeforeVacuum = MemoryLogStore.numGetLatestExternalEntryCalls + + spark.sql(s"VACUUM delta.`$path` RETAIN 0 HOURS") + + val numExternalCallsAfterVacuum = MemoryLogStore.numGetLatestExternalEntryCalls + + ret = numExternalCallsAfterVacuum - numExternalCallsBeforeVacuum + } + } + + ret + } + + assert( + doVacuumTestGetNumVacuumExternalStoreCalls(true) > + doVacuumTestGetNumVacuumExternalStoreCalls(false) + ) + } + + test("#3423: BaseExternalLogStore::isDeltaLogPath") { + val store = createLogStore(spark) + .asInstanceOf[LogStoreAdaptor].logStoreImpl + .asInstanceOf[MemoryLogStore] + + // json file + assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/0000.json"))) + + // checkpoint file + assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/0010.checkpoint.parquet"))) + + // file listing prefix + assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/0000."))) + + // delta_log folder (with / prefix) + assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/"))) + + // delta_log folder (without / prefix) + assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log"))) + + // obvious negative cases + assert(!store.isDeltaLogPath(new Path("s3://bucket/part-000-UUID.parquet"))) + + // edge cases of `_delta_log` in a folder + assert(!store.isDeltaLogPath(new Path("s3://bucket/__delta_log/"))) + assert(!store.isDeltaLogPath(new Path("s3://bucket/_delta_log_"))) + } + test("single write") { withTempLogDir { tempLogDir => val store = createLogStore(spark) @@ -262,8 +369,6 @@ class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSui } } } - - protected def shouldUseRenameToWriteCheckpoint: Boolean = false } ///////////////////////////////////