Skip to content

Commit

Permalink
review points
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe-Abraham committed Feb 3, 2025
1 parent 9207141 commit 2024364
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 73 deletions.
6 changes: 3 additions & 3 deletions presto-native-execution/presto_cpp/main/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ add_library(
presto_types OBJECT
PrestoToVeloxQueryPlan.cpp PrestoToVeloxExpr.cpp VeloxPlanValidator.cpp
PrestoToVeloxSplit.cpp PrestoToVeloxConnector.cpp)

add_dependencies(presto_types presto_operators presto_type_converter velox_type
velox_type_fbhive)

target_link_libraries(presto_types presto_type_converter velox_type_fbhive
velox_hive_partition_function velox_tpch_gen velox_functions_json)
target_link_libraries(
presto_types presto_type_converter velox_type_fbhive
velox_hive_partition_function velox_tpch_gen velox_functions_json)

if(PRESTO_ENABLE_REMOTE_FUNCTIONS)
add_dependencies(presto_types velox_expression presto_server_remote_function
Expand Down
117 changes: 57 additions & 60 deletions presto-native-execution/presto_cpp/main/types/PrestoToVeloxExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,61 @@ std::string getFunctionName(const protocol::SqlFunctionId& functionId) {
: functionId;
}

#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
TypedExprPtr VeloxExprConverter::registerRestRemoteFunction(
const protocol::RestFunctionHandle& restFunctionHandle,
const std::vector<TypedExprPtr>& args,
const TypePtr& returnType) const {
const auto* systemConfig = SystemConfig::instance();

velox::functions::RemoteVectorFunctionMetadata metadata;
const auto& serdeName = systemConfig->remoteFunctionServerSerde();
if (serdeName == "presto_page") {
metadata.serdeFormat = velox::functions::PageFormat::PRESTO_PAGE;
} else {
VELOX_FAIL(
"presto_page serde is expected by remote function server but got : '{}'",
serdeName);
}
metadata.location = systemConfig->remoteFunctionRestUrl();
metadata.functionId = restFunctionHandle.functionId;
metadata.version = restFunctionHandle.version;

const auto& prestoSignature = restFunctionHandle.signature;
velox::exec::FunctionSignatureBuilder signatureBuilder;

for (const auto& typeVar : prestoSignature.typeVariableConstraints) {
signatureBuilder.typeVariable(typeVar.name);
}

for (const auto& longVar : prestoSignature.longVariableConstraints) {
signatureBuilder.integerVariable(longVar.name);
}

signatureBuilder.returnType(prestoSignature.returnType);

for (const auto& argType : prestoSignature.argumentTypes) {
signatureBuilder.argumentType(argType);
}

if (prestoSignature.variableArity) {
signatureBuilder.variableArity();
}

auto signature = signatureBuilder.build();
std::vector<velox::exec::FunctionSignaturePtr> veloxSignatures = {signature};

velox::functions::registerRemoteFunction(
getFunctionName(restFunctionHandle.functionId),
veloxSignatures,
metadata,
false);

return std::make_shared<CallTypedExpr>(
returnType, args, getFunctionName(restFunctionHandle.functionId));
}
#endif

} // namespace

velox::variant VeloxExprConverter::getConstantValue(
Expand Down Expand Up @@ -466,18 +521,6 @@ std::optional<TypedExprPtr> VeloxExprConverter::tryConvertLike(
returnType, args, getFunctionName(signature));
}

#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
PageFormat fromSerdeString(const std::string_view& serdeName) {
if (serdeName == "presto_page") {
return PageFormat::PRESTO_PAGE;
} else {
VELOX_FAIL(
"presto_page serde is expected by remote function server but got : '{}'",
serdeName);
}
}
#endif

TypedExprPtr VeloxExprConverter::toVeloxExpr(
const protocol::CallExpression& pexpr) const {
if (auto builtin = std::dynamic_pointer_cast<protocol::BuiltInFunctionHandle>(
Expand Down Expand Up @@ -533,57 +576,11 @@ TypedExprPtr VeloxExprConverter::toVeloxExpr(
auto restFunctionHandle =
std::dynamic_pointer_cast<protocol::RestFunctionHandle>(
pexpr.functionHandle)) {

// Defer to our new helper function for restFunctionHandle.
auto args = toVeloxExpr(pexpr.arguments);
auto returnType = typeParser_->parse(pexpr.returnType);

const auto* systemConfig = SystemConfig::instance();

velox::functions::RemoteVectorFunctionMetadata metadata;
metadata.serdeFormat =
fromSerdeString(systemConfig->remoteFunctionServerSerde());
metadata.location = systemConfig->remoteFunctionRestUrl();
metadata.functionId = restFunctionHandle->functionId;
metadata.version = restFunctionHandle->version;

const auto& prestoSignature = restFunctionHandle->signature;
// parseTypeSignature
velox::exec::FunctionSignatureBuilder signatureBuilder;
// Handle type variable constraints
for (const auto& typeVar : prestoSignature.typeVariableConstraints) {
signatureBuilder.typeVariable(typeVar.name);
}

// Handle long variable constraints (for integer variables)
for (const auto& longVar : prestoSignature.longVariableConstraints) {
signatureBuilder.integerVariable(longVar.name);
}

// Handle return type
signatureBuilder.returnType(prestoSignature.returnType);

// Handle argument types
for (const auto& argType : prestoSignature.argumentTypes) {
signatureBuilder.argumentType(argType);
}

// Handle variable arity
if (prestoSignature.variableArity) {
signatureBuilder.variableArity();
}

auto signature = signatureBuilder.build();
std::vector<velox::exec::FunctionSignaturePtr> veloxSignatures = {
signature};

velox::functions::registerRemoteFunction(
getFunctionName(restFunctionHandle->functionId),
veloxSignatures,
metadata,
false);

return std::make_shared<CallTypedExpr>(
returnType, args, getFunctionName(restFunctionHandle->functionId));
return registerRestRemoteFunction(*restFunctionHandle, args, returnType);
}
#endif
VELOX_FAIL("Unsupported function handle: {}", pexpr.functionHandle->_type);
Expand Down
10 changes: 0 additions & 10 deletions presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,6 @@ target_link_libraries(
${GFLAGS_LIBRARIES}
pthread)

if(PRESTO_ENABLE_REMOTE_FUNCTIONS)
add_dependencies(presto_expressions_test presto_server_remote_function
velox_expression velox_functions_remote)

target_link_libraries(
presto_expressions_test GTest::gmock GTest::gmock_main
presto_server_remote_function velox_expression velox_functions_remote)

endif()

set_property(TARGET presto_expressions_test PROPERTY JOB_POOL_LINK
presto_link_job_pool)

Expand Down

0 comments on commit 2024364

Please sign in to comment.