Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[iceberg] [native] Add support for V1 tables #21584

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions presto-native-execution/etc/catalog/hive.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
connector.name=hive

cache.enabled=true
2 changes: 2 additions & 0 deletions presto-native-execution/etc/catalog/iceberg.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# The Presto "iceberg" catalog is handled by the hive connector in Presto native execution.
connector.name=hive
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ std::shared_ptr<connector::ColumnHandle> toColumnHandle(
toRequiredSubfields(hiveColumn->requiredSubfields));
}

if (auto icebergColumn =
yingsu00 marked this conversation as resolved.
Show resolved Hide resolved
dynamic_cast<const protocol::IcebergColumnHandle*>(column)) {
// TODO(imjalpreet): Modify 'hiveType' argument of the 'HiveColumnHandle'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What needs to be done on hiveType?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of the hive connector, HiveColumnHandle(Presto) also has a parameter of type HiveType which is being used while creating the HiveColumnHandle velox object. We don't have the same in IcebergColumnHandle yet.

The prestissimo change was added as part of this commit in the case of hive connector: 6570b00

majetideepak marked this conversation as resolved.
Show resolved Hide resolved
// constructor similar to how Hive Connector is handling for bucketing
return std::make_shared<connector::hive::HiveColumnHandle>(
icebergColumn->columnIdentity.name,
toHiveColumnType(icebergColumn->columnType),
stringToType(icebergColumn->type, typeParser),
stringToType(icebergColumn->type, typeParser),
toRequiredSubfields(icebergColumn->requiredSubfields));
}

if (auto tpchColumn =
dynamic_cast<const protocol::TpchColumnHandle*>(column)) {
return std::make_shared<connector::tpch::TpchColumnHandle>(
Expand Down Expand Up @@ -856,6 +868,81 @@ TypePtr fieldNamesToLowerCase<TypeKind::ROW>(const TypePtr& type) {
return std::make_shared<RowType>(std::move(names), std::move(types));
}

std::shared_ptr<connector::ConnectorTableHandle> toHiveTableHandle(
const protocol::TupleDomain<protocol::Subfield>& domainPredicate,
const std::shared_ptr<protocol::RowExpression>& remainingPredicate,
bool isPushdownFilterEnabled,
const std::string& tableName,
const protocol::List<protocol::Column>& dataColumns,
const protocol::TableHandle& tableHandle,
const protocol::Map<protocol::String, protocol::String>& tableParameters,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser) {
connector::hive::SubfieldFilters subfieldFilters;
auto domains = domainPredicate.domains;
for (const auto& domain : *domains) {
auto filter = domain.second;
subfieldFilters[common::Subfield(domain.first)] =
toFilter(domain.second, exprConverter, typeParser);
}

auto remainingFilter = exprConverter.toVeloxExpr(remainingPredicate);
if (auto constant = std::dynamic_pointer_cast<const core::ConstantTypedExpr>(
remainingFilter)) {
bool value = constant->value().value<bool>();
VELOX_CHECK(value, "Unexpected always-false remaining predicate");

// Use null for always-true filter.
remainingFilter = nullptr;
}

RowTypePtr finalDataColumns;
if (!dataColumns.empty()) {
std::vector<std::string> names;
std::vector<TypePtr> types;
velox::type::fbhive::HiveTypeParser typeParser;
names.reserve(dataColumns.size());
types.reserve(dataColumns.size());
for (auto& column : dataColumns) {
std::string name = column.name;
folly::toLowerAscii(name);
names.emplace_back(std::move(name));
auto parsedType = typeParser.parse(column.type);
// The type from the metastore may have upper case letters
// in field names, convert them all to lower case to be
// compatible with Presto.
types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH(
fieldNamesToLowerCase, parsedType->kind(), parsedType));
}
finalDataColumns = ROW(std::move(names), std::move(types));
}

if (tableParameters.empty()) {
return std::make_shared<connector::hive::HiveTableHandle>(
tableHandle.connectorId,
tableName,
isPushdownFilterEnabled,
std::move(subfieldFilters),
remainingFilter,
finalDataColumns);
}

std::unordered_map<std::string, std::string> finalTableParameters = {};
finalTableParameters.reserve(tableParameters.size());
for (const auto& [key, value] : tableParameters) {
finalTableParameters[key] = value;
}

return std::make_shared<connector::hive::HiveTableHandle>(
tableHandle.connectorId,
tableName,
isPushdownFilterEnabled,
std::move(subfieldFilters),
remainingFilter,
finalDataColumns,
finalTableParameters);
}

std::shared_ptr<connector::ConnectorTableHandle> toConnectorTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
Expand All @@ -869,47 +956,6 @@ std::shared_ptr<connector::ConnectorTableHandle> toConnectorTableHandle(
partitionColumns.emplace(entry.name, toColumnHandle(&entry, typeParser));
}

connector::hive::SubfieldFilters subfieldFilters;
auto domains = hiveLayout->domainPredicate.domains;
for (const auto& domain : *domains) {
auto filter = domain.second;
subfieldFilters[common::Subfield(domain.first)] =
toFilter(domain.second, exprConverter, typeParser);
}

auto remainingFilter =
exprConverter.toVeloxExpr(hiveLayout->remainingPredicate);
if (auto constant =
std::dynamic_pointer_cast<const core::ConstantTypedExpr>(
remainingFilter)) {
bool value = constant->value().value<bool>();
VELOX_CHECK(value, "Unexpected always-false remaining predicate");

// Use null for always-true filter.
remainingFilter = nullptr;
}

RowTypePtr dataColumns;
if (!hiveLayout->dataColumns.empty()) {
std::vector<std::string> names;
std::vector<TypePtr> types;
velox::type::fbhive::HiveTypeParser typeParser;
names.reserve(hiveLayout->dataColumns.size());
types.reserve(hiveLayout->dataColumns.size());
for (auto& column : hiveLayout->dataColumns) {
std::string name = column.name;
folly::toLowerAscii(name);
names.emplace_back(std::move(name));
auto parsedType = typeParser.parse(column.type);
// The type from the metastore may have upper case letters
// in field names, convert them all to lower case to be
// compatible with Presto.
types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH(
fieldNamesToLowerCase, parsedType->kind(), parsedType));
}
dataColumns = ROW(std::move(names), std::move(types));
}

auto hiveTableHandle =
std::dynamic_pointer_cast<const protocol::HiveTableHandle>(
tableHandle.connectorHandle);
Expand All @@ -921,20 +967,49 @@ std::shared_ptr<connector::ConnectorTableHandle> toConnectorTableHandle(
: fmt::format(
"{}.{}", hiveTableHandle->schemaName, hiveTableHandle->tableName);

std::unordered_map<std::string, std::string> tableParameters;
tableParameters.reserve(hiveLayout->tableParameters.size());
for (const auto& [key, value] : hiveLayout->tableParameters) {
tableParameters[key] = value;
return toHiveTableHandle(
hiveLayout->domainPredicate,
hiveLayout->remainingPredicate,
hiveLayout->pushdownFilterEnabled,
tableName,
hiveLayout->dataColumns,
tableHandle,
hiveLayout->tableParameters,
exprConverter,
typeParser);
}

if (auto icebergLayout =
std::dynamic_pointer_cast<const protocol::IcebergTableLayoutHandle>(
tableHandle.connectorTableLayout)) {
for (const auto& entry : icebergLayout->partitionColumns) {
partitionColumns.emplace(
entry.columnIdentity.name, toColumnHandle(&entry, typeParser));
}

return std::make_shared<connector::hive::HiveTableHandle>(
tableHandle.connectorId,
auto icebergTableHandle =
std::dynamic_pointer_cast<const protocol::IcebergTableHandle>(
tableHandle.connectorHandle);
VELOX_CHECK_NOT_NULL(icebergTableHandle);

// Use fully qualified name if available.
std::string tableName = icebergTableHandle->schemaName.empty()
? icebergTableHandle->icebergTableName.tableName
: fmt::format(
"{}.{}",
icebergTableHandle->schemaName,
icebergTableHandle->icebergTableName.tableName);

return toHiveTableHandle(
icebergLayout->domainPredicate,
icebergLayout->remainingPredicate,
icebergLayout->pushdownFilterEnabled,
tableName,
hiveLayout->pushdownFilterEnabled,
std::move(subfieldFilters),
remainingFilter,
dataColumns,
tableParameters);
icebergLayout->dataColumns,
tableHandle,
{},
exprConverter,
typeParser);
}

if (auto tpchLayout =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ dwio::common::FileFormat toVeloxFileFormat(
"Unsupported file format: {} {}", format.inputFormat, format.serDe);
}

dwio::common::FileFormat toVeloxFileFormat(
const presto::protocol::FileFormat format) {
if (format == protocol::FileFormat::ORC) {
return dwio::common::FileFormat::DWRF;
} else if (format == protocol::FileFormat::PARQUET) {
return dwio::common::FileFormat::PARQUET;
}
VELOX_UNSUPPORTED("Unsupported file format: {}", fmt::underlying(format));
}

} // anonymous namespace

velox::exec::Split toVeloxSplit(
Expand Down Expand Up @@ -91,6 +101,30 @@ velox::exec::Split toVeloxSplit(
serdeParameters),
splitGroupId);
}

if (auto icebergSplit =
yingsu00 marked this conversation as resolved.
Show resolved Hide resolved
std::dynamic_pointer_cast<const protocol::IcebergSplit>(
connectorSplit)) {
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
for (const auto& entry : icebergSplit->partitionKeys) {
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
partitionKeys.emplace(
entry.second.name,
entry.second.value == nullptr
? std::nullopt
: std::optional<std::string>{*entry.second.value});
}

return velox::exec::Split(
std::make_shared<connector::hive::HiveConnectorSplit>(
scheduledSplit.split.connectorId,
icebergSplit->path,
toVeloxFileFormat(icebergSplit->fileFormat),
icebergSplit->start,
icebergSplit->length,
partitionKeys),
splitGroupId);
}

if (auto remoteSplit = std::dynamic_pointer_cast<const protocol::RemoteSplit>(
connectorSplit)) {
return velox::exec::Split(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ bool unregisterConnector(const std::string& connectorName) {
void registerHiveConnectors() {
registerConnector("hive", "hive");
registerConnector("hive-hadoop2", "hive");
registerConnector("iceberg", "hive-iceberg");
}

void registerTpchConnector() {
Expand Down
Loading