Skip to content

Commit

Permalink
Introduce BaseHiveTableHandle
Browse files Browse the repository at this point in the history
Refactor HiveTableHandle and IcebergTableHandle to extend BaseHiveTableHandle
  • Loading branch information
imjalpreet authored and yingsu00 committed Jan 18, 2024
1 parent 38df84d commit f75610e
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import static java.util.Objects.requireNonNull;

public class BaseHiveTableHandle
implements ConnectorTableHandle
{
private final String schemaName;
private final String tableName;

@JsonCreator
public BaseHiveTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
}

@JsonProperty
public String getSchemaName()
{
return schemaName;
}

@JsonProperty
public String getTableName()
{
return tableName;
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package com.facebook.presto.hive;

import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

Expand All @@ -26,11 +24,8 @@
import static java.util.Objects.requireNonNull;

public class HiveTableHandle
implements ConnectorTableHandle
extends BaseHiveTableHandle
{
private final String schemaName;
private final String tableName;

private final Optional<List<List<String>>> analyzePartitionValues;

@JsonCreator
Expand All @@ -39,8 +34,8 @@ public HiveTableHandle(
@JsonProperty("tableName") String tableName,
@JsonProperty("analyzePartitionValues") Optional<List<List<String>>> analyzePartitionValues)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
super(schemaName, tableName);

this.analyzePartitionValues = requireNonNull(analyzePartitionValues, "analyzePartitionValues is null");
}

Expand All @@ -51,19 +46,7 @@ public HiveTableHandle(String schemaName, String tableName)

public HiveTableHandle withAnalyzePartitionValues(Optional<List<List<String>>> analyzePartitionValues)
{
return new HiveTableHandle(schemaName, tableName, analyzePartitionValues);
}

@JsonProperty
public String getSchemaName()
{
return schemaName;
}

@JsonProperty
public String getTableName()
{
return tableName;
return new HiveTableHandle(getSchemaName(), getTableName(), analyzePartitionValues);
}

@JsonProperty
Expand All @@ -72,11 +55,6 @@ public Optional<List<List<String>>> getAnalyzePartitionValues()
return analyzePartitionValues;
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
}

@Override
public boolean equals(Object o)
{
Expand All @@ -88,23 +66,23 @@ public boolean equals(Object o)
}
HiveTableHandle that = (HiveTableHandle) o;
// Do not include analyzePartitionValues in hashCode and equals comparison
return Objects.equals(schemaName, that.schemaName) &&
Objects.equals(tableName, that.tableName);
return Objects.equals(getSchemaName(), that.getSchemaName()) &&
Objects.equals(getTableName(), that.getTableName());
}

@Override
public int hashCode()
{
// Do not include analyzePartitionValues in hashCode and equals comparison
return Objects.hash(schemaName, tableName);
return Objects.hash(getSchemaName(), getTableName());
}

@Override
public String toString()
{
return toStringHelper(this)
.add("schemaName", schemaName)
.add("tableName", tableName)
.add("schemaName", getSchemaName())
.add("tableName", getTableName())
.add("analyzePartitionValues", analyzePartitionValues)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ protected Optional<SystemTable> getIcebergSystemTable(SchemaTableName tableName,
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) table;
return getTableMetadata(session, icebergTableHandle.getSchemaTableName(), icebergTableHandle.getTableName());
return getTableMetadata(session, icebergTableHandle.getSchemaTableName(), icebergTableHandle.getIcebergTableName());
}

protected ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table, IcebergTableName icebergTableName)
Expand Down Expand Up @@ -395,7 +395,7 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle

return new IcebergWritableTableHandle(
table.getSchemaName(),
table.getTableName(),
table.getIcebergTableName(),
SchemaParser.toJson(icebergTable.schema()),
PartitionSpecParser.toJson(icebergTable.spec()),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
Expand Down Expand Up @@ -561,7 +561,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
}

IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
verify(handle.getTableName().getTableType() == DATA, "only the data table can have columns added");
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can have columns added");
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
Transaction transaction = icebergTable.newTransaction();
transaction.updateSchema().addColumn(column.getName(), columnType, column.getComment()).commit();
Expand All @@ -578,7 +578,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
IcebergColumnHandle handle = (IcebergColumnHandle) column;
verify(icebergTableHandle.getTableName().getTableType() == DATA, "only the data table can have columns dropped");
verify(icebergTableHandle.getIcebergTableName().getTableType() == DATA, "only the data table can have columns dropped");
Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName());

// Currently drop partition column used in any partition specs of a table would introduce some problems in Iceberg.
Expand All @@ -598,7 +598,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle source, String target)
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
verify(icebergTableHandle.getTableName().getTableType() == DATA, "only the data table can have columns renamed");
verify(icebergTableHandle.getIcebergTableName().getTableType() == DATA, "only the data table can have columns renamed");
IcebergColumnHandle columnHandle = (IcebergColumnHandle) source;
Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName());
Transaction transaction = icebergTable.newTransaction();
Expand All @@ -613,7 +613,7 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
verify(table.getTableName().getTableType() == DATA, "only the data table can have data inserted");
verify(table.getIcebergTableName().getTableType() == DATA, "only the data table can have data inserted");
Table icebergTable = getIcebergTable(session, table.getSchemaTableName());
validateTableMode(session, icebergTable);

Expand All @@ -626,7 +626,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, table.getSchemaTableName());
Schema schema;
if (table.getTableName().getTableType() == CHANGELOG) {
if (table.getIcebergTableName().getTableType() == CHANGELOG) {
schema = ChangelogUtil.changelogTableSchema(getRowTypeFromColumnMeta(getColumnMetadatas(icebergTable)));
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
verify(handle.getTableName().getTableType() == DATA, "only the data table can be dropped");
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can be dropped");
// TODO: support path override in Iceberg table creation
org.apache.iceberg.Table table = getIcebergTable(session, handle.getSchemaTableName());
Optional<Map<String, String>> tableProperties = tryGetProperties(table);
Expand All @@ -345,15 +345,15 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
throw new PrestoException(NOT_SUPPORTED, "Table " + handle.getSchemaTableName() + " contains Iceberg path override properties and cannot be dropped from Presto");
}
}
metastore.dropTable(getMetastoreContext(session), handle.getSchemaName(), handle.getTableName().getTableName(), true);
metastore.dropTable(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName(), true);
}

@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTable)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
verify(handle.getTableName().getTableType() == DATA, "only the data table can be renamed");
metastore.renameTable(getMetastoreContext(session), handle.getSchemaName(), handle.getTableName().getTableName(), newTable.getSchemaName(), newTable.getTableName());
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can be renamed");
metastore.renameTable(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName(), newTable.getSchemaName(), newTable.getTableName());
}

@Override
Expand Down Expand Up @@ -470,7 +470,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
return new VariableReferenceExpression(Optional.empty(), columnHandle.getName(), columnHandle.getType());
});
RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate);
PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getTableName().getTableName());
PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
TableStatistics mergedStatistics = mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeStrategy, icebergTable.spec());
TableStatistics.Builder filteredStatsBuilder = TableStatistics.builder()
.setRowCount(mergedStatistics.getRowCount());
Expand Down Expand Up @@ -498,7 +498,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
IcebergColumnHandle::getType)));
}).orElseGet(() -> {
if (!mergeStrategy.equals(HiveStatisticsMergeStrategy.NONE)) {
PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getTableName().getTableName());
PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
return mergeHiveStatistics(icebergStatistics, hiveStats, mergeStrategy, icebergTable.spec());
}
return icebergStatistics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
verify(icebergTableHandle.getTableName().getTableType() == DATA, "only the data table can be dropped");
verify(icebergTableHandle.getIcebergTableName().getTableType() == DATA, "only the data table can be dropped");
TableIdentifier tableIdentifier = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName());
resourceFactory.getCatalog(session).dropTable(tableIdentifier);
}
Expand All @@ -216,7 +216,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTable)
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
verify(icebergTableHandle.getTableName().getTableType() == DATA, "only the data table can be renamed");
verify(icebergTableHandle.getIcebergTableName().getTableType() == DATA, "only the data table can be renamed");
TableIdentifier from = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName());
TableIdentifier to = toIcebergTableIdentifier(newTable);
resourceFactory.getCatalog(session).renameTable(from, to);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ public ConnectorPageSource createPageSource(
.forEach(regularColumns::add);

// TODO: pushdownFilter for icebergLayout
HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName().getTableName());
HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getIcebergTableName().getTableName());
ConnectorPageSourceWithRowPositions connectorPageSourceWithRowPositions = createDataPageSource(
session,
hdfsContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public ConnectorSplitSource getSplits(
IcebergTableLayoutHandle layoutHandle = (IcebergTableLayoutHandle) layout;
IcebergTableHandle table = layoutHandle.getTable();

if (!table.getTableName().getSnapshotId().isPresent()) {
if (!table.getIcebergTableName().getSnapshotId().isPresent()) {
return new FixedSplitSource(ImmutableList.of());
}

Expand All @@ -77,10 +77,10 @@ public ConnectorSplitSource getSplits(

Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName());

if (table.getTableName().getTableType() == CHANGELOG) {
if (table.getIcebergTableName().getTableType() == CHANGELOG) {
// if the snapshot isn't specified, grab the oldest available version of the table
long fromSnapshot = table.getTableName().getSnapshotId().orElseGet(() -> SnapshotUtil.oldestAncestor(icebergTable).snapshotId());
long toSnapshot = table.getTableName().getChangelogEndSnapshot().orElse(icebergTable.currentSnapshot().snapshotId());
long fromSnapshot = table.getIcebergTableName().getSnapshotId().orElseGet(() -> SnapshotUtil.oldestAncestor(icebergTable).snapshotId());
long toSnapshot = table.getIcebergTableName().getChangelogEndSnapshot().orElse(icebergTable.currentSnapshot().snapshotId());
IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan()
.fromSnapshotExclusive(fromSnapshot)
.toSnapshot(toSnapshot);
Expand All @@ -89,7 +89,7 @@ public ConnectorSplitSource getSplits(
else {
TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(predicate))
.useSnapshot(table.getTableName().getSnapshotId().get());
.useSnapshot(table.getIcebergTableName().getSnapshotId().get());

// TODO Use residual. Right now there is no way to propagate residual to presto but at least we can
// propagate it at split level so the parquet pushdown can leverage it.
Expand Down
Loading

0 comments on commit f75610e

Please sign in to comment.