From 9d0787dcf3ad6e52f0c9aa0c5c4524d3ad66afb4 Mon Sep 17 00:00:00 2001 From: Joe Abraham Date: Tue, 3 Dec 2024 11:33:43 +0530 Subject: [PATCH] review points fixed --- velox/functions/remote/client/Remote.cpp | 7 +- velox/functions/remote/client/Remote.h | 6 +- velox/functions/remote/client/RestClient.cpp | 12 ++-- velox/functions/remote/client/RestClient.h | 4 +- .../remote/server/RemoteFunctionHelper.h | 67 +++++++++++++++++++ .../server/RemoteFunctionRestService.cpp | 43 +----------- .../remote/server/RemoteFunctionService.cpp | 45 +------------ 7 files changed, 83 insertions(+), 101 deletions(-) create mode 100644 velox/functions/remote/server/RemoteFunctionHelper.h diff --git a/velox/functions/remote/client/Remote.cpp b/velox/functions/remote/client/Remote.cpp index e8572099bdf56..aa58a9e75881f 100644 --- a/velox/functions/remote/client/Remote.cpp +++ b/velox/functions/remote/client/Remote.cpp @@ -20,19 +20,18 @@ #include #include #include + #include "velox/common/memory/ByteStream.h" #include "velox/expression/Expr.h" #include "velox/expression/VectorFunction.h" +#include "velox/functions/remote/client/RestClient.h" #include "velox/functions/remote/client/ThriftClient.h" #include "velox/functions/remote/if/GetSerde.h" #include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/type/fbhive/HiveTypeSerializer.h" -#include "velox/vector/FlatVector.h" #include "velox/vector/VectorStream.h" -#include "velox/functions/remote/client/RestClient.h" - using namespace folly; namespace facebook::velox::functions { namespace { @@ -140,7 +139,7 @@ class RemoteFunction : public exec::VectorFunction { metadata_.version.value_or("1")); std::unique_ptr responseBody = - restClient_->performCurlRequest(fullUrl, std::move(requestBody)); + restClient_->invokeFunction(fullUrl, std::move(requestBody)); auto outputRowVector = IOBufToRowVector( *responseBody, ROW({outputType}), *context.pool(), &serde); diff --git a/velox/functions/remote/client/Remote.h b/velox/functions/remote/client/Remote.h index 09ea16dc426e2..16fa1db37ae90 100644 --- a/velox/functions/remote/client/Remote.h +++ b/velox/functions/remote/client/Remote.h @@ -25,7 +25,7 @@ namespace facebook::velox::functions { struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata { /// URL of the HTTP/REST server for remote function. - /// Or Network address of the servr to communicate with. Note that this can + /// Or Network address of the server to communicate with. Note that this can /// hold a network location (ip/port pair) or a unix domain socket path (see /// SocketAddress::makeFromPath()). boost::variant location; @@ -55,8 +55,8 @@ struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata { // /// Remote functions are registered as regular statufull functions (using the /// same internal catalog), and hence conflict if there already exists a -/// (non-remote) function registered with the same name. The `overwrite` -/// flagwrite controls whether to overwrite in these cases. +/// (non-remote) function registered with the same name. The `overwrite` flag +/// controls whether to overwrite in these cases. void registerRemoteFunction( const std::string& name, std::vector signatures, diff --git a/velox/functions/remote/client/RestClient.cpp b/velox/functions/remote/client/RestClient.cpp index 0952162a95a3d..228b853b46c29 100644 --- a/velox/functions/remote/client/RestClient.cpp +++ b/velox/functions/remote/client/RestClient.cpp @@ -40,16 +40,16 @@ size_t readCallback(char* dest, size_t size, size_t nmemb, void* userp) { return totalCopied; } -size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userdata) { - auto* outputBuf = static_cast(userdata); - size_t total_size = size * nmemb; - auto buf = IOBuf::copyBuffer(ptr, total_size); +size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userData) { + auto* outputBuf = static_cast(userData); + size_t totalSize = size * nmemb; + auto buf = IOBuf::copyBuffer(ptr, totalSize); outputBuf->append(std::move(buf)); - return total_size; + return totalSize; } } // namespace -std::unique_ptr RestClient::performCurlRequest( +std::unique_ptr RestClient::invokeFunction( const std::string& fullUrl, std::unique_ptr requestPayload) { try { diff --git a/velox/functions/remote/client/RestClient.h b/velox/functions/remote/client/RestClient.h index 6fe145183a6f7..fd5c12723c4c8 100644 --- a/velox/functions/remote/client/RestClient.h +++ b/velox/functions/remote/client/RestClient.h @@ -26,14 +26,14 @@ class HttpClient { public: virtual ~HttpClient() = default; - virtual std::unique_ptr performCurlRequest( + virtual std::unique_ptr invokeFunction( const std::string& url, std::unique_ptr requestPayload) = 0; }; class RestClient : public HttpClient { public: - std::unique_ptr performCurlRequest( + std::unique_ptr invokeFunction( const std::string& url, std::unique_ptr requestPayload) override; }; diff --git a/velox/functions/remote/server/RemoteFunctionHelper.h b/velox/functions/remote/server/RemoteFunctionHelper.h new file mode 100644 index 0000000000000..d77ed47a6eb9d --- /dev/null +++ b/velox/functions/remote/server/RemoteFunctionHelper.h @@ -0,0 +1,67 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include "velox/expression/Expr.h" +#include "velox/type/Type.h" +#include "velox/type/fbhive/HiveTypeParser.h" + +namespace facebook::velox::functions { +std::string getFunctionName( + const std::string& prefix, + const std::string& functionName) { + return prefix.empty() ? functionName + : fmt::format("{}.{}", prefix, functionName); +} + +TypePtr deserializeType(const std::string& input) { + // Use hive type parser/serializer. + return type::fbhive::HiveTypeParser().parse(input); +} + +RowTypePtr deserializeArgTypes(const std::vector& argTypes) { + const size_t argCount = argTypes.size(); + + std::vector argumentTypes; + std::vector typeNames; + argumentTypes.reserve(argCount); + typeNames.reserve(argCount); + + for (size_t i = 0; i < argCount; ++i) { + argumentTypes.emplace_back(deserializeType(argTypes[i])); + typeNames.emplace_back(fmt::format("c{}", i)); + } + return ROW(std::move(typeNames), std::move(argumentTypes)); +} + +std::vector getExpressions( + const RowTypePtr& inputType, + const TypePtr& returnType, + const std::string& functionName) { + std::vector inputs; + for (size_t i = 0; i < inputType->size(); ++i) { + inputs.push_back(std::make_shared( + inputType->childAt(i), inputType->nameOf(i))); + } + + return {std::make_shared( + returnType, std::move(inputs), functionName)}; +} + +} // namespace facebook::velox::functions diff --git a/velox/functions/remote/server/RemoteFunctionRestService.cpp b/velox/functions/remote/server/RemoteFunctionRestService.cpp index 589fad41b27cb..a5c6824593936 100644 --- a/velox/functions/remote/server/RemoteFunctionRestService.cpp +++ b/velox/functions/remote/server/RemoteFunctionRestService.cpp @@ -19,7 +19,7 @@ #include #include #include "velox/expression/Expr.h" -#include "velox/type/fbhive/HiveTypeParser.h" +#include "velox/functions/remote/server/RemoteFunctionHelper.h" #include "velox/vector/VectorStream.h" namespace facebook::velox::functions { @@ -41,47 +41,6 @@ std::map internalFunctionSignatureMap = // called to use the functions mentioned in this map }; -TypePtr deserializeType(const std::string& input) { - // Use hive type parser/serializer. - return type::fbhive::HiveTypeParser().parse(input); -} - -RowTypePtr deserializeArgTypes(const std::vector& argTypes) { - const size_t argCount = argTypes.size(); - - std::vector argumentTypes; - std::vector typeNames; - argumentTypes.reserve(argCount); - typeNames.reserve(argCount); - - for (size_t i = 0; i < argCount; ++i) { - argumentTypes.emplace_back(deserializeType(argTypes[i])); - typeNames.emplace_back(fmt::format("c{}", i)); - } - return ROW(std::move(typeNames), std::move(argumentTypes)); -} - -std::vector getExpressions( - const RowTypePtr& inputType, - const TypePtr& returnType, - const std::string& functionName) { - std::vector inputs; - for (size_t i = 0; i < inputType->size(); ++i) { - inputs.push_back(std::make_shared( - inputType->childAt(i), inputType->nameOf(i))); - } - - return {std::make_shared( - returnType, std::move(inputs), functionName)}; -} - -std::string getFunctionName( - const std::string& prefix, - const std::string& functionName) { - return prefix.empty() ? functionName - : fmt::format("{}.{}", prefix, functionName); -} - } // namespace session::session( diff --git a/velox/functions/remote/server/RemoteFunctionService.cpp b/velox/functions/remote/server/RemoteFunctionService.cpp index 2cc7a0abac129..58365797bc4a9 100644 --- a/velox/functions/remote/server/RemoteFunctionService.cpp +++ b/velox/functions/remote/server/RemoteFunctionService.cpp @@ -18,54 +18,11 @@ #include "velox/common/base/Exceptions.h" #include "velox/expression/Expr.h" #include "velox/functions/remote/if/GetSerde.h" +#include "velox/functions/remote/server/RemoteFunctionHelper.h" #include "velox/type/fbhive/HiveTypeParser.h" #include "velox/vector/VectorStream.h" namespace facebook::velox::functions { -namespace { - -std::string getFunctionName( - const std::string& prefix, - const std::string& functionName) { - return prefix.empty() ? functionName - : fmt::format("{}.{}", prefix, functionName); -} - -TypePtr deserializeType(const std::string& input) { - // Use hive type parser/serializer. - return type::fbhive::HiveTypeParser().parse(input); -} - -RowTypePtr deserializeArgTypes(const std::vector& argTypes) { - const size_t argCount = argTypes.size(); - - std::vector argumentTypes; - std::vector typeNames; - argumentTypes.reserve(argCount); - typeNames.reserve(argCount); - - for (size_t i = 0; i < argCount; ++i) { - argumentTypes.emplace_back(deserializeType(argTypes[i])); - typeNames.emplace_back(fmt::format("c{}", i)); - } - return ROW(std::move(typeNames), std::move(argumentTypes)); -} - -} // namespace - -std::vector getExpressions( - const RowTypePtr& inputType, - const TypePtr& returnType, - const std::string& functionName) { - std::vector inputs; - for (size_t i = 0; i < inputType->size(); ++i) { - inputs.push_back(std::make_shared( - inputType->childAt(i), inputType->nameOf(i))); - } - - return {std::make_shared( - returnType, std::move(inputs), functionName)}; -} void RemoteFunctionServiceHandler::handleErrors( apache::thrift::field_ref result,