Skip to content

Commit

Permalink
Add Iceberg Filter Pushdown Optimizer Rule for execution with Velox
Browse files Browse the repository at this point in the history
Co-authored-by: Reetika Agrawal <[email protected]>
Co-authored-by: Tim Meehan <[email protected]>
  • Loading branch information
3 people committed Dec 4, 2023
1 parent 82961c4 commit c1ec0ff
Show file tree
Hide file tree
Showing 17 changed files with 1,111 additions and 55 deletions.
6 changes: 6 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-analyzer</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.common.type.SqlTimestampWithTimeZone;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HivePartition;
import com.facebook.presto.hive.HiveWrittenPartitions;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.spi.ColumnHandle;
Expand All @@ -35,6 +36,7 @@
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.DiscretePredicates;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
Expand All @@ -43,6 +45,9 @@
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.connector.ConnectorTableVersion;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.statistics.TableStatisticType;
Expand Down Expand Up @@ -90,6 +95,11 @@

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.MetadataUtils.createPredicate;
import static com.facebook.presto.hive.MetadataUtils.getCombinedRemainingPredicate;
import static com.facebook.presto.hive.MetadataUtils.getDiscretePredicates;
import static com.facebook.presto.hive.MetadataUtils.getPredicate;
import static com.facebook.presto.hive.MetadataUtils.getSubfieldPredicate;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergColumnHandle.primitiveIcebergColumnHandle;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
Expand Down Expand Up @@ -131,13 +141,22 @@ public abstract class IcebergAbstractMetadata
protected final TypeManager typeManager;
protected final JsonCodec<CommitTaskData> commitTaskCodec;
protected final NodeVersion nodeVersion;
protected final RowExpressionService rowExpressionService;
private final StandardFunctionResolution functionResolution;

protected Transaction transaction;

public IcebergAbstractMetadata(TypeManager typeManager, JsonCodec<CommitTaskData> commitTaskCodec, NodeVersion nodeVersion)
public IcebergAbstractMetadata(
TypeManager typeManager,
StandardFunctionResolution functionResolution,
RowExpressionService rowExpressionService,
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
}

Expand Down Expand Up @@ -165,17 +184,19 @@ public List<ConnectorTableLayoutResult> getTableLayouts(
TupleDomain<ColumnHandle> partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), Predicates.in(getPartitionKeyColumnHandles(icebergTable, typeManager))));
Optional<Set<IcebergColumnHandle>> requestedColumns = desiredColumns.map(columns -> columns.stream().map(column -> (IcebergColumnHandle) column).collect(toImmutableSet()));

ConnectorTableLayout layout = new ConnectorTableLayout(new IcebergTableLayoutHandle.Builder()
.setPartitionColumns(getPartitionKeyColumnHandles(icebergTable, typeManager))
.setDomainPredicate(constraint.getSummary().transform(IcebergAbstractMetadata::toSubfield))
.setRemainingPredicate(TRUE_CONSTANT)
.setPredicateColumns(predicateColumns)
.setRequestedColumns(requestedColumns)
.setPushdownFilterEnabled(isPushdownFilterEnabled(session))
.setPartitionColumnPredicate(partitionColumnPredicate)
.setPartitions(Optional.empty())
.setTable(handle)
.build());
ConnectorTableLayout layout = getTableLayout(
session,
new IcebergTableLayoutHandle.Builder()
.setPartitionColumns(getPartitionKeyColumnHandles(icebergTable, typeManager))
.setDomainPredicate(constraint.getSummary().transform(IcebergAbstractMetadata::toSubfield))
.setRemainingPredicate(TRUE_CONSTANT)
.setPredicateColumns(predicateColumns)
.setRequestedColumns(requestedColumns)
.setPushdownFilterEnabled(isPushdownFilterEnabled(session))
.setPartitionColumnPredicate(partitionColumnPredicate)
.setPartitions(Optional.empty())
.setTable(handle)
.build());
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}

Expand All @@ -192,10 +213,57 @@ protected static boolean isEntireColumn(Subfield subfield)
@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
{
IcebergTableHandle tableHandle = ((IcebergTableLayoutHandle) handle).getTable();
Table table = getIcebergTable(session, tableHandle.getSchemaTableName());
validateTableMode(session, table);
return new ConnectorTableLayout(handle);
IcebergTableLayoutHandle icebergTableLayoutHandle = (IcebergTableLayoutHandle) handle;

IcebergTableHandle tableHandle = icebergTableLayoutHandle.getTable();
if (!tableExists(session, tableHandle.getSchemaTableName())) {
return null;
}

Table icebergTable = getIcebergTable(session, tableHandle.getSchemaTableName());
validateTableMode(session, icebergTable);

if (!isPushdownFilterEnabled(session)) {
return new ConnectorTableLayout(handle);
}

List<ColumnHandle> partitionColumns = ImmutableList.copyOf(icebergTableLayoutHandle.getPartitionColumns());
List<HivePartition> partitions = icebergTableLayoutHandle.getPartitions().get();

Optional<DiscretePredicates> discretePredicates = getDiscretePredicates(partitionColumns, partitions);

TupleDomain<ColumnHandle> predicate;
RowExpression subfieldPredicate;
if (isPushdownFilterEnabled(session)) {
Map<String, ColumnHandle> predicateColumns = icebergTableLayoutHandle.getPredicateColumns().entrySet()
.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

predicate = getPredicate(icebergTableLayoutHandle, partitionColumns, partitions, predicateColumns);

// capture subfields from domainPredicate to add to remainingPredicate
// so those filters don't get lost
Map<String, com.facebook.presto.common.type.Type> columnTypes = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getName, icebergColumnHandle -> getColumnMetadata(session, tableHandle, icebergColumnHandle).getType()));

subfieldPredicate = getSubfieldPredicate(session, icebergTableLayoutHandle, columnTypes, functionResolution, rowExpressionService);
}
else {
predicate = createPredicate(partitionColumns, partitions);
subfieldPredicate = TRUE_CONSTANT;
}

// combine subfieldPredicate with remainingPredicate
RowExpression combinedRemainingPredicate = getCombinedRemainingPredicate(icebergTableLayoutHandle, subfieldPredicate);

return new ConnectorTableLayout(
icebergTableLayoutHandle,
Optional.empty(),
predicate,
Optional.empty(),
Optional.empty(),
discretePredicates,
ImmutableList.of(),
Optional.of(combinedRemainingPredicate));
}

protected Optional<SystemTable> getIcebergSystemTable(SchemaTableName tableName, Table table)
Expand Down Expand Up @@ -351,6 +419,12 @@ public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, Connect
return primitiveIcebergColumnHandle(0, "$row_id", BIGINT, Optional.empty());
}

@Override
public boolean isLegacyGetLayoutSupported(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return !isPushdownFilterEnabled(session);
}

protected List<ColumnMetadata> getColumnMetadatas(Table table)
{
return table.schema().columns().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
import com.facebook.presto.hive.gcs.HiveGcsConfig;
import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.iceberg.optimizer.IcebergParquetDereferencePushDown;
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizer;
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider;
import com.facebook.presto.orc.CachingStripeMetadataSource;
import com.facebook.presto.orc.DwrfAwareStripeMetadataSourceFactory;
import com.facebook.presto.orc.EncryptionLibrary;
Expand All @@ -64,6 +63,7 @@
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.cache.Cache;
Expand Down Expand Up @@ -155,8 +155,7 @@ public void setup(Binder binder)

configBinder(binder).bindConfig(ParquetCacheConfig.class, connectorId);

binder.bind(IcebergPlanOptimizer.class).in(Scopes.SINGLETON);
binder.bind(IcebergParquetDereferencePushDown.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPlanOptimizerProvider.class).to(IcebergPlanOptimizerProvider.class).in(Scopes.SINGLETON);
}

@ForCachingHiveMetastore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.presto.hive.HiveTransactionHandle;
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.connector.Connector;
Expand Down Expand Up @@ -64,7 +62,7 @@ public class IcebergConnector
private final List<PropertyMetadata<?>> columnProperties;
private final ConnectorAccessControl accessControl;
private final Set<Procedure> procedures;
private final Set<ConnectorPlanOptimizer> planOptimizers;
private final ConnectorPlanOptimizerProvider planOptimizerProvider;

public IcebergConnector(
LifeCycleManager lifeCycleManager,
Expand All @@ -81,7 +79,7 @@ public IcebergConnector(
List<PropertyMetadata<?>> columnProperties,
ConnectorAccessControl accessControl,
Set<Procedure> procedures,
Set<ConnectorPlanOptimizer> planOptimizers)
ConnectorPlanOptimizerProvider planOptimizerProvider)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
Expand All @@ -97,7 +95,7 @@ public IcebergConnector(
this.columnProperties = ImmutableList.copyOf(requireNonNull(columnProperties, "columnProperties is null"));
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.procedures = requireNonNull(procedures, "procedures is null");
this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null");
this.planOptimizerProvider = requireNonNull(planOptimizerProvider, "planOptimizerProvider is null");
}

@Override
Expand Down Expand Up @@ -218,6 +216,6 @@ public final void shutdown()
@Override
public ConnectorPlanOptimizerProvider getConnectorPlanOptimizerProvider()
{
return new IcebergPlanOptimizerProvider(planOptimizers);
return planOptimizerProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.ViewNotFoundException;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
Expand Down Expand Up @@ -137,22 +138,26 @@ public class IcebergHiveMetadata
private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID())));

private final FilterStatsCalculatorService filterStatsCalculatorService;
private final RowExpressionService rowExpressionService;

public IcebergHiveMetadata(
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
StandardFunctionResolution functionResolution,
RowExpressionService rowExpressionService,
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
RowExpressionService rowExpressionService)
FilterStatsCalculatorService filterStatsCalculatorService)
{
super(typeManager, commitTaskCodec, nodeVersion);
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion);
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
}

public ExtendedHiveMetastore getMetastore()
{
return metastore;
}

@Override
Expand Down Expand Up @@ -315,7 +320,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
// TODO: support path override in Iceberg table creation
org.apache.iceberg.Table table = getHiveIcebergTable(metastore, hdfsEnvironment, session, handle.getSchemaTableName());
org.apache.iceberg.Table table = getIcebergTable(session, handle.getSchemaTableName());
Optional<Map<String, String>> tableProperties = tryGetProperties(table);
if (tableProperties.isPresent()) {
if (tableProperties.get().containsKey(OBJECT_STORE_PATH) ||
Expand All @@ -342,7 +347,7 @@ protected ConnectorTableMetadata getTableMetadata(ConnectorSession session, Sche
throw new TableNotFoundException(table);
}

org.apache.iceberg.Table icebergTable = getHiveIcebergTable(metastore, hdfsEnvironment, session, table);
org.apache.iceberg.Table icebergTable = getIcebergTable(session, table);
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable);

return new ConnectorTableMetadata(table, columns, createMetadataProperties(icebergTable), getTableComment(icebergTable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.relation.RowExpressionService;

Expand All @@ -33,33 +34,36 @@ public class IcebergHiveMetadataFactory
final HdfsEnvironment hdfsEnvironment;
final TypeManager typeManager;
final JsonCodec<CommitTaskData> commitTaskCodec;
final StandardFunctionResolution functionResolution;
final RowExpressionService rowExpressionService;
final NodeVersion nodeVersion;
final FilterStatsCalculatorService filterStatsCalculatorService;
final RowExpressionService rowExpressionService;

@Inject
public IcebergHiveMetadataFactory(
IcebergConfig config,
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
StandardFunctionResolution functionResolution,
RowExpressionService rowExpressionService,
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
RowExpressionService rowExpressionService)
FilterStatsCalculatorService filterStatsCalculatorService)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
requireNonNull(config, "config is null");
}

public ConnectorMetadata create()
{
return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec, nodeVersion, filterStatsCalculatorService, rowExpressionService);
return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService);
}
}
Loading

0 comments on commit c1ec0ff

Please sign in to comment.