Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[iceberg] [native] Introduce Iceberg Connector in Prestissimo (with multi-level inheritance) #21905

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Optional;
Expand All @@ -31,7 +30,6 @@
public class BaseHiveTableLayoutHandle
implements ConnectorTableLayoutHandle
{
private final List<BaseHiveColumnHandle> partitionColumns;
private final TupleDomain<Subfield> domainPredicate;
private final RowExpression remainingPredicate;
private final boolean pushdownFilterEnabled;
Expand All @@ -42,14 +40,12 @@ public class BaseHiveTableLayoutHandle

@JsonCreator
public BaseHiveTableLayoutHandle(
@JsonProperty("partitionColumns") List<BaseHiveColumnHandle> partitionColumns,
@JsonProperty("domainPredicate") TupleDomain<Subfield> domainPredicate,
@JsonProperty("remainingPredicate") RowExpression remainingPredicate,
@JsonProperty("pushdownFilterEnabled") boolean pushdownFilterEnabled,
@JsonProperty("partitionColumnPredicate") TupleDomain<ColumnHandle> partitionColumnPredicate)
{
this(
partitionColumns,
domainPredicate,
remainingPredicate,
pushdownFilterEnabled,
Expand All @@ -58,27 +54,19 @@ public BaseHiveTableLayoutHandle(
}

public BaseHiveTableLayoutHandle(
List<BaseHiveColumnHandle> partitionColumns,
TupleDomain<Subfield> domainPredicate,
RowExpression remainingPredicate,
boolean pushdownFilterEnabled,
TupleDomain<ColumnHandle> partitionColumnPredicate,
Optional<List<HivePartition>> partitions)
{
this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null"));
this.domainPredicate = requireNonNull(domainPredicate, "domainPredicate is null");
this.remainingPredicate = requireNonNull(remainingPredicate, "remainingPredicate is null");
this.pushdownFilterEnabled = pushdownFilterEnabled;
this.partitionColumnPredicate = requireNonNull(partitionColumnPredicate, "partitionColumnPredicate is null");
this.partitions = requireNonNull(partitions, "partitions is null");
}

@JsonProperty
public List<BaseHiveColumnHandle> getPartitionColumns()
{
return partitionColumns;
}

@JsonProperty
public TupleDomain<Subfield> getDomainPredicate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2566,7 +2566,7 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa
.collect(toImmutableSet());

Set<String> partitionColumnNames = layoutHandle.getPartitionColumns().stream()
.map(BaseHiveColumnHandle::getName)
.map(HiveColumnHandle::getName)
.collect(toImmutableSet());

return partitionColumnNames.containsAll(predicateColumnNames);
Expand Down Expand Up @@ -3676,7 +3676,7 @@ public TableLayoutFilterCoverage getTableLayoutFilterCoverage(ConnectorTableLayo
{
HiveTableLayoutHandle tableHandle = (HiveTableLayoutHandle) connectorTableLayoutHandle;
Set<String> relevantColumns = tableHandle.getPartitionColumns().stream()
.map(BaseHiveColumnHandle::getName)
.map(HiveColumnHandle::getName)
.filter(relevantPartitionColumns::contains)
.collect(toImmutableSet());
if (relevantColumns.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public ConnectorPageSource createPageSource(
hiveStorageTimeZone,
typeManager,
hiveLayout.getSchemaTableName(),
hiveLayout.getPartitionColumns().stream().map(HiveColumnHandle.class::cast).collect(toList()),
hiveLayout.getPartitionColumns(),
hiveLayout.getDataColumns(),
hiveLayout.getTableParameters(),
hiveSplit.getPartitionDataColumnCount(),
Expand Down Expand Up @@ -616,9 +616,7 @@ private static boolean shouldSkipBucket(HiveTableLayoutHandle hiveLayout, HiveSp

private static boolean shouldSkipPartition(TypeManager typeManager, HiveTableLayoutHandle hiveLayout, DateTimeZone hiveStorageTimeZone, HiveSplit hiveSplit, SplitContext splitContext)
{
List<HiveColumnHandle> partitionColumns = hiveLayout.getPartitionColumns().stream()
.map(HiveColumnHandle.class::cast)
.collect(toList());
List<HiveColumnHandle> partitionColumns = hiveLayout.getPartitionColumns();
List<Type> partitionTypes = partitionColumns.stream()
.map(column -> typeManager.getType(column.getTypeSignature()))
.collect(toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public class HivePartitionResult
{
private final List<BaseHiveColumnHandle> partitionColumns;
private final List<HiveColumnHandle> partitionColumns;
private final List<Column> dataColumns;
private final Map<String, String> tableParameters;
private final List<HivePartition> partitions;
Expand All @@ -47,7 +47,7 @@ public class HivePartitionResult
private final Optional<HiveBucketFilter> bucketFilter;

public HivePartitionResult(
List<BaseHiveColumnHandle> partitionColumns,
List<HiveColumnHandle> partitionColumns,
List<Column> dataColumns,
Map<String, String> tableParameters,
List<HivePartition> partitions,
Expand All @@ -68,7 +68,7 @@ public HivePartitionResult(
this.bucketFilter = requireNonNull(bucketFilter, "bucketFilter is null");
}

public List<BaseHiveColumnHandle> getPartitionColumns()
public List<HiveColumnHandle> getPartitionColumns()
{
return partitionColumns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class HiveTableLayoutHandle
extends BaseHiveTableLayoutHandle
{
private final SchemaTableName schemaTableName;
private final String tablePath;
private final List<HiveColumnHandle> partitionColumns;
private final List<Column> dataColumns;
private final Map<String, String> tableParameters;
private final Map<String, HiveColumnHandle> predicateColumns;
Expand Down Expand Up @@ -89,7 +89,7 @@ public HiveTableLayoutHandle(
this(
schemaTableName,
tablePath,
partitionColumns.stream().map(BaseHiveColumnHandle.class::cast).collect(toList()),
partitionColumns,
dataColumns,
tableParameters,
domainPredicate,
Expand All @@ -111,7 +111,7 @@ public HiveTableLayoutHandle(
protected HiveTableLayoutHandle(
SchemaTableName schemaTableName,
String tablePath,
List<BaseHiveColumnHandle> partitionColumns,
List<HiveColumnHandle> partitionColumns,
List<Column> dataColumns,
Map<String, String> tableParameters,
TupleDomain<Subfield> domainPredicate,
Expand All @@ -130,7 +130,6 @@ protected HiveTableLayoutHandle(
Optional<HiveTableHandle> hiveTableHandle)
{
super(
partitionColumns,
domainPredicate,
remainingPredicate,
pushdownFilterEnabled,
Expand All @@ -139,6 +138,7 @@ protected HiveTableLayoutHandle(

this.schemaTableName = requireNonNull(schemaTableName, "table is null");
this.tablePath = requireNonNull(tablePath, "tablePath is null");
this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null"));
this.dataColumns = ImmutableList.copyOf(requireNonNull(dataColumns, "dataColumns is null"));
this.tableParameters = ImmutableMap.copyOf(requireNonNull(tableParameters, "tableProperties is null"));
this.predicateColumns = requireNonNull(predicateColumns, "predicateColumns is null");
Expand All @@ -165,6 +165,12 @@ public String getTablePath()
return tablePath;
}

@JsonProperty
public List<HiveColumnHandle> getPartitionColumns()
{
return partitionColumns;
}

@JsonProperty
public List<Column> getDataColumns()
{
Expand Down Expand Up @@ -283,7 +289,7 @@ private TupleDomain<ColumnHandle> getConstraint(PlanCanonicalizationStrategy can
// Constants are only removed from point checks, and not range checks. Example:
// `x = 1` is equivalent to `x = 1000`
// `x > 1` is NOT equivalent to `x > 1000`
TupleDomain<ColumnHandle> constraint = createPredicate(ImmutableList.copyOf(getPartitionColumns()), partitions.get());
TupleDomain<ColumnHandle> constraint = createPredicate(ImmutableList.copyOf(partitionColumns), partitions.get());
constraint = getDomainPredicate()
.transform(subfield -> subfield.getPath().isEmpty() ? subfield.getRootName() : null)
.transform(getPredicateColumns()::get)
Expand Down Expand Up @@ -355,7 +361,7 @@ public static class Builder
{
private SchemaTableName schemaTableName;
private String tablePath;
private List<BaseHiveColumnHandle> partitionColumns;
private List<HiveColumnHandle> partitionColumns;
private List<Column> dataColumns;
private Map<String, String> tableParameters;
private TupleDomain<Subfield> domainPredicate;
Expand Down Expand Up @@ -386,7 +392,7 @@ public Builder setTablePath(String tablePath)
return this;
}

public Builder setPartitionColumns(List<BaseHiveColumnHandle> partitionColumns)
public Builder setPartitionColumns(List<HiveColumnHandle> partitionColumns)
{
this.partitionColumns = partitionColumns;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.hive.BaseHiveColumnHandle;
import com.facebook.presto.hive.BaseHiveTableLayoutHandle;
import com.facebook.presto.hive.HivePartition;
import com.facebook.presto.hive.metastore.Column;
Expand All @@ -32,11 +31,11 @@
import java.util.Set;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class IcebergTableLayoutHandle
extends BaseHiveTableLayoutHandle
{
private final List<IcebergColumnHandle> partitionColumns;
private final List<Column> dataColumns;
private final Map<String, IcebergColumnHandle> predicateColumns;
private final Optional<Set<IcebergColumnHandle>> requestedColumns;
Expand All @@ -55,7 +54,7 @@ public IcebergTableLayoutHandle(
@JsonProperty("table") IcebergTableHandle table)
{
this(
partitionColumns.stream().map(BaseHiveColumnHandle.class::cast).collect(toList()),
partitionColumns,
dataColumns,
domainPredicate,
remainingPredicate,
Expand All @@ -68,7 +67,7 @@ public IcebergTableLayoutHandle(
}

protected IcebergTableLayoutHandle(
List<BaseHiveColumnHandle> partitionColumns,
List<IcebergColumnHandle> partitionColumns,
List<Column> dataColumns,
TupleDomain<Subfield> domainPredicate,
RowExpression remainingPredicate,
Expand All @@ -80,19 +79,25 @@ protected IcebergTableLayoutHandle(
IcebergTableHandle table)
{
super(
partitionColumns,
domainPredicate,
remainingPredicate,
pushdownFilterEnabled,
partitionColumnPredicate,
partitions);

this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null"));
this.dataColumns = ImmutableList.copyOf(requireNonNull(dataColumns, "dataColumns is null"));
this.predicateColumns = requireNonNull(predicateColumns, "predicateColumns is null");
this.requestedColumns = requireNonNull(requestedColumns, "requestedColumns is null");
this.table = requireNonNull(table, "table is null");
}

@JsonProperty
public List<IcebergColumnHandle> getPartitionColumns()
{
return partitionColumns;
}

@JsonProperty
public List<Column> getDataColumns()
{
Expand Down Expand Up @@ -129,7 +134,7 @@ public boolean equals(Object o)
IcebergTableLayoutHandle that = (IcebergTableLayoutHandle) o;
return Objects.equals(getDomainPredicate(), that.getDomainPredicate()) &&
Objects.equals(getRemainingPredicate(), that.getRemainingPredicate()) &&
Objects.equals(getPartitionColumns(), that.getPartitionColumns()) &&
Objects.equals(partitionColumns, that.partitionColumns) &&
Objects.equals(predicateColumns, that.predicateColumns) &&
Objects.equals(requestedColumns, that.requestedColumns) &&
Objects.equals(isPushdownFilterEnabled(), that.isPushdownFilterEnabled()) &&
Expand All @@ -151,7 +156,7 @@ public String toString()

public static class Builder
{
private List<BaseHiveColumnHandle> partitionColumns;
private List<IcebergColumnHandle> partitionColumns;
private List<Column> dataColumns;
private TupleDomain<Subfield> domainPredicate;
private RowExpression remainingPredicate;
Expand All @@ -162,7 +167,7 @@ public static class Builder
private Optional<List<HivePartition>> partitions;
private IcebergTableHandle table;

public Builder setPartitionColumns(List<BaseHiveColumnHandle> partitionColumns)
public Builder setPartitionColumns(List<IcebergColumnHandle> partitionColumns)
{
this.partitionColumns = partitionColumns;
return this;
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/etc/catalog/iceberg.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# The Presto "iceberg" catalog is handled by the hive connector in Presto native execution.
connector.name=hive

cache.enabled=true
Loading
Loading