Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Jan 30, 2025
1 parent 57029e1 commit 3b8c84c
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,19 @@ public interface ScanBuilder {
* read from the scan files (returned by {@link Scan#getScanFiles(Engine)}) to completely filter
* out the data that doesn't satisfy the filter.```
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @param predicate a {@link Predicate} to prune the metadata or data.
* @return A {@link ScanBuilder} with filter applied.
*/
ScanBuilder withFilter(Engine engine, Predicate predicate);
ScanBuilder withFilter(Predicate predicate);

/**
* Apply the given <i>readSchema</i>. If the builder already has a projection applied, calling
* this again replaces the existing projection.
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @param readSchema Subset of columns to read from the Delta table.
* @return A {@link ScanBuilder} with projection pruning.
*/
ScanBuilder withReadSchema(Engine engine, StructType readSchema);
ScanBuilder withReadSchema(StructType readSchema);

/** @return Build the {@link Scan instance} */
Scan build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand Down Expand Up @@ -55,7 +54,7 @@ public ScanBuilderImpl(
}

@Override
public ScanBuilder withFilter(Engine engine, Predicate predicate) {
public ScanBuilder withFilter(Predicate predicate) {
if (this.predicate.isPresent()) {
throw new IllegalArgumentException("There already exists a filter in current builder");
}
Expand All @@ -64,7 +63,7 @@ public ScanBuilder withFilter(Engine engine, Predicate predicate) {
}

@Override
public ScanBuilder withReadSchema(Engine engine, StructType readSchema) {
public ScanBuilder withReadSchema(StructType readSchema) {
// TODO: validate the readSchema is a subset of the table schema
this.readSchema = readSchema;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static boolean partitionExists(
io.delta.kernel.internal.util.PartitionUtils.validatePredicateOnlyOnPartitionColumns(
partitionPredicate, snapshotPartColNames);

final Scan scan = snapshot.getScanBuilder().withFilter(engine, partitionPredicate).build();
final Scan scan = snapshot.getScanBuilder().withFilter(partitionPredicate).build();

try (CloseableIterator<FilteredColumnarBatch> columnarBatchIter = scan.getScanFiles(engine)) {
while (columnarBatchIter.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,13 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
val snapshot = latestSnapshot(tablePath)
hits.foreach { predicate =>
val scanFiles = collectScanFileRows(
snapshot.getScanBuilder()
.withFilter(defaultEngine, predicate)
.build())
snapshot.getScanBuilder().withFilter(predicate).build())
assert(scanFiles.nonEmpty, s"Expected hit but got miss for $predicate")
}
misses.foreach { predicate =>
val scanFiles = collectScanFileRows(
snapshot.getScanBuilder()
.withFilter(defaultEngine, predicate)
.withFilter(predicate)
.build())
assert(scanFiles.isEmpty, s"Expected miss but got hit for $predicate\n" +
s"Returned scan files have stats: ${getScanFileStats(scanFiles)}"
Expand All @@ -121,9 +119,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
val snapshot = latestSnapshot(tablePath)
filterToNumExpFiles.foreach { case (filter, numExpFiles) =>
val scanFiles = collectScanFileRows(
snapshot.getScanBuilder()
.withFilter(defaultEngine, filter)
.build())
snapshot.getScanBuilder().withFilter(filter).build())
assert(scanFiles.length == numExpFiles,
s"Expected $numExpFiles but found ${scanFiles.length} for $filter")
}
Expand Down Expand Up @@ -1010,7 +1006,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
predicate: Predicate, expNumPartitions: Int, expNumFiles: Long): Unit = {
val snapshot = latestSnapshot(tableDir.getCanonicalPath)
val scanFiles = collectScanFileRows(
snapshot.getScanBuilder().withFilter(defaultEngine, predicate).build())
snapshot.getScanBuilder().withFilter(predicate).build())
assert(scanFiles.length == expNumFiles,
s"Expected $expNumFiles but found ${scanFiles.length} for $predicate")

Expand Down Expand Up @@ -1496,7 +1492,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
val partFilter = equals(new Column("part"), ofInt(1))
verifyNoStatsColumn(
snapshot(engineDisallowedStatsReads)
.getScanBuilder().withFilter(engine, partFilter).build()
.getScanBuilder().withFilter(partFilter).build()
.getScanFiles(engine))

// no eligible data skipping filter --> don't read stats
Expand All @@ -1505,7 +1501,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
ofInt(1))
verifyNoStatsColumn(
snapshot(engineDisallowedStatsReads)
.getScanBuilder().withFilter(engine, nonEligibleFilter).build()
.getScanBuilder().withFilter(nonEligibleFilter).build()
.getScanFiles(engine))
}

Expand Down Expand Up @@ -1543,9 +1539,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
val engine = engineVerifyJsonParseSchema(verifySchema(expectedCols))
collectScanFileRows(
Table.forPath(engine, path).getLatestSnapshot(engine)
.getScanBuilder()
.withFilter(engine, predicate)
.build(),
.getScanBuilder().withFilter(predicate).build(),
engine = engine)
}
}
Expand Down Expand Up @@ -1573,7 +1567,6 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
latestSnapshot(tempDir.getCanonicalPath)
.getScanBuilder()
.withFilter(
defaultEngine,
greaterThan(
new ScalarExpression("+", Seq(col("id"), ofInt(10)).asJava),
ofInt(100)
Expand All @@ -1584,13 +1577,8 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with
checkStatsPresent(
latestSnapshot(tempDir.getCanonicalPath)
.getScanBuilder()
.withFilter(
defaultEngine,
greaterThan(
col("id"),
ofInt(0)
)
).build()
.withFilter(greaterThan(col("id"), ofInt(0)))
.build()
)
}
}
Expand Down Expand Up @@ -1634,7 +1622,7 @@ class ScanSuite extends AnyFunSuite with TestUtils with ExpressionTestUtils with

val scanBuilder = snapshot.getScanBuilder()
val scan = predicate match {
case Some(pred) => scanBuilder.withFilter(defaultEngine, pred).build()
case Some(pred) => scanBuilder.withFilter(pred).build()
case None => scanBuilder.build()
}
val scanFiles = scan.asInstanceOf[ScanImpl].getScanFiles(defaultEngine, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ trait TestUtils extends Assertions with SQLHelper {
var scanBuilder = snapshot.getScanBuilder()

if (readSchema != null) {
scanBuilder = scanBuilder.withReadSchema(engine, readSchema)
scanBuilder = scanBuilder.withReadSchema(readSchema)
}

if (filter != null) {
scanBuilder = scanBuilder.withFilter(engine, filter)
scanBuilder = scanBuilder.withFilter(filter)
}

val scan = scanBuilder.build()
Expand Down Expand Up @@ -265,7 +265,7 @@ trait TestUtils extends Assertions with SQLHelper {
val scan = Table.forPath(engine, tablePath)
.getLatestSnapshot(engine)
.getScanBuilder()
.withReadSchema(engine, readSchema)
.withReadSchema(readSchema)
.build()
val scanState = scan.getScanState(engine)

Expand Down

0 comments on commit 3b8c84c

Please sign in to comment.