Skip to content

Commit

Permalink
Remove proxygen
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe-Abraham committed Nov 12, 2024
1 parent f63705f commit 8609347
Show file tree
Hide file tree
Showing 13 changed files with 464 additions and 397 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ option(VELOX_ENABLE_ABFS "Build Abfs Connector" OFF)
option(VELOX_ENABLE_HDFS "Build Hdfs Connector" OFF)
option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF)
option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" ON)
option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON)

option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF)
Expand Down
21 changes: 0 additions & 21 deletions velox/functions/remote/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

if(NOT DEFINED PROXYGEN_LIBRARIES)
find_package(Sodium REQUIRED)

find_library(PROXYGEN proxygen)
find_library(PROXYGEN_HTTP_SERVER proxygenhttpserver)
find_library(FIZZ fizz)
find_library(WANGLE wangle)

if(NOT PROXYGEN
OR NOT PROXYGEN_HTTP_SERVER
OR NOT FIZZ
OR NOT WANGLE)
message(
FATAL_ERROR
"One or more proxygen libraries were not found. Please ensure proxygen, proxygenhttpserver, fizz, and wangle are installed."
)
endif()

set(PROXYGEN_LIBRARIES ${PROXYGEN_HTTP_SERVER} ${PROXYGEN} ${WANGLE} ${FIZZ})
endif()

add_subdirectory(if)
add_subdirectory(client)
add_subdirectory(server)
10 changes: 5 additions & 5 deletions velox/functions/remote/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ velox_add_library(velox_functions_remote_thrift_client ThriftClient.cpp)
velox_link_libraries(velox_functions_remote_thrift_client
PUBLIC remote_function_thrift FBThrift::thriftcpp2)

velox_add_library(velox_functions_remote_rest_client RestClient.cpp)
velox_link_libraries(velox_functions_remote_rest_client ${PROXYGEN_LIBRARIES}
velox_exec Folly::folly)
add_library(velox_functions_remote_rest_client RestClient.cpp)
target_link_libraries(
velox_functions_remote_rest_client Folly::folly CURL::libcurl)

velox_add_library(velox_functions_remote Remote.cpp)
velox_link_libraries(
velox_functions_remote
PUBLIC velox_functions_remote_rest_client
velox_expression
PUBLIC velox_expression
velox_memory
velox_exec
velox_vector
velox_presto_serializer
velox_functions_remote_thrift_client
velox_functions_remote_rest_client
velox_functions_remote_get_serde
velox_type_fbhive
Folly::folly)
Expand Down
60 changes: 27 additions & 33 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "velox/common/memory/ByteStream.h"
#include "velox/expression/Expr.h"
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/client/RestClient.h"
#include "velox/functions/remote/client/ThriftClient.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h"
Expand All @@ -29,18 +28,17 @@
#include "velox/vector/FlatVector.h"
#include "velox/vector/VectorStream.h"

#include <cctype>
#include <iomanip>
#include <fmt/format.h>
#include <sstream>
#include <string>

#include "RestClient.h"

using namespace folly;
using namespace proxygen;
namespace facebook::velox::functions {
namespace {

std::string serializeType(const TypePtr& type) {
// Use hive type serializer.
return type::fbhive::HiveTypeSerializer::serialize(type);
}

Expand Down Expand Up @@ -72,13 +70,16 @@ class RemoteFunction : public exec::VectorFunction {
RemoteFunction(
const std::string& functionName,
const std::vector<exec::VectorFunctionArg>& inputArgs,
const RemoteVectorFunctionMetadata& metadata)
: functionName_(functionName), metadata_(metadata) {
const RemoteVectorFunctionMetadata& metadata,
std::unique_ptr<HttpClient> httpClient = nullptr)
: functionName_(functionName),
restClient_(httpClient ? std::move(httpClient) : getRestClient()),
metadata_(metadata) {
if (metadata.location.type() == typeid(SocketAddress)) {
location_ = boost::get<SocketAddress>(metadata.location);
thriftClient_ = getThriftClient(location_, &eventBase_);
} else if (metadata.location.type() == typeid(URL)) {
url_ = boost::get<URL>(metadata.location);
} else if (metadata.location.type() == typeid(std::string)) {
url_ = boost::get<std::string>(metadata.location);
}

std::vector<TypePtr> types;
Expand All @@ -101,7 +102,7 @@ class RemoteFunction : public exec::VectorFunction {
try {
if ((metadata_.location.type() == typeid(SocketAddress))) {
applyRemote(rows, args, outputType, context, result);
} else if (metadata_.location.type() == typeid(URL)) {
} else if (metadata_.location.type() == typeid(std::string)) {
applyRestRemote(rows, args, outputType, context, result);
}
} catch (const VeloxRuntimeError&) {
Expand All @@ -119,41 +120,34 @@ class RemoteFunction : public exec::VectorFunction {
exec::EvalCtx& context,
VectorPtr& result) const {
try {
std::string fullUrl = fmt::format(
"{}/v1/functions/{}/{}/{}/{}",
url_.getUrl(),
metadata_.schema.value_or("default"),
extractFunctionName(functionName_),
urlEncode(metadata_.functionId.value_or("default_function_id")),
metadata_.version.value_or("1"));

// Serialize the input data
serializer::presto::PrestoVectorSerde serde;
serializer::presto::PrestoVectorSerde::PrestoOptions options;

auto remoteRowVector = std::make_shared<RowVector>(
context.pool(),
remoteInputType_,
BufferPtr{},
rows.end(),
std::move(args));

// Serialize the RowVector into an IOBuf (binary format)
IOBuf payload = rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), &serde);
std::unique_ptr<IOBuf> requestBody =
std::make_unique<IOBuf>(rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), &serde));

// Send the serialized data to the remote function via RestClient
RestClient restClient(fullUrl);
std::unique_ptr<IOBuf> responseBody;
restClient.invoke_function(
std::make_unique<IOBuf>(std::move(payload)), (responseBody));
const std::string fullUrl = fmt::format(
"{}/v1/functions/{}/{}/{}/{}",
url_,
metadata_.schema.value_or("default"),
extractFunctionName(functionName_),
urlEncode(metadata_.functionId.value_or("default_function_id")),
metadata_.version.value_or("1"));

std::unique_ptr<IOBuf> responseBody =
restClient_->performCurlRequest(fullUrl, std::move(requestBody));

auto outputRowVector = IOBufToRowVector(
*responseBody, ROW({outputType}), *context.pool(), &serde);
result = outputRowVector->childAt(0);

result = outputRowVector->childAt(0);
} catch (const std::exception& e) {
// Catch and handle any exceptions thrown during the process
VELOX_FAIL(
"Error while executing remote function '{}': {}",
functionName_,
Expand Down Expand Up @@ -238,11 +232,11 @@ class RemoteFunction : public exec::VectorFunction {
}

const std::string functionName_;

EventBase eventBase_;
std::unique_ptr<RemoteFunctionClient> thriftClient_;
std::unique_ptr<HttpClient> restClient_;
SocketAddress location_;
URL url_;
std::string url_;
RowTypePtr remoteInputType_;
std::vector<std::string> serializedInputTypes_;
const RemoteVectorFunctionMetadata metadata_;
Expand Down
3 changes: 1 addition & 2 deletions velox/functions/remote/client/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <boost/variant.hpp>
#include <folly/SocketAddress.h>
#include <proxygen/lib/utils/URL.h>
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunction_types.h"

Expand All @@ -29,7 +28,7 @@ struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata {
/// Or Network address of the servr to communicate with. Note that this can
/// hold a network location (ip/port pair) or a unix domain socket path (see
/// SocketAddress::makeFromPath()).
boost::variant<folly::SocketAddress, proxygen::URL> location;
boost::variant<folly::SocketAddress, std::string> location;

/// The serialization format to be used when sending data to the remote.
remote::PageFormat serdeFormat{remote::PageFormat::PRESTO_PAGE};
Expand Down
103 changes: 91 additions & 12 deletions velox/functions/remote/client/RestClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,101 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RestClient.h"
#include <folly/io/IOBuf.h>
#include <folly/logging/xlog.h>

#include "velox/functions/remote/client/RestClient.h"

#include <curl/curl.h>
#include <folly/io/IOBufQueue.h>
#include <simdjson.h>

#include "velox/common/base/Exceptions.h"

using namespace folly;
namespace facebook::velox::functions {
namespace {
size_t readCallback(char* dest, size_t size, size_t nmemb, void* userp) {
auto* inputBufQueue = static_cast<IOBufQueue*>(userp);
size_t bufferSize = size * nmemb;
size_t totalCopied = 0;

// RestClient Implementation
RestClient::RestClient(const std::string& url) : url_(URL(url)) {
httpClient_ = std::make_shared<HttpClient>(url_);
while (totalCopied < bufferSize && !inputBufQueue->empty()) {
auto buf = inputBufQueue->front();
size_t remainingSize = bufferSize - totalCopied;
size_t copySize = std::min(remainingSize, buf->length());
std::memcpy(dest + totalCopied, buf->data(), copySize);
totalCopied += copySize;
inputBufQueue->pop_front();
}

return totalCopied;
}
size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userdata) {
auto* outputBuf = static_cast<IOBufQueue*>(userdata);
size_t total_size = size * nmemb;
auto buf = IOBuf::copyBuffer(ptr, total_size);
outputBuf->append(std::move(buf));
return total_size;
}
} // namespace

std::unique_ptr<IOBuf> RestClient::performCurlRequest(
const std::string& fullUrl,
std::unique_ptr<IOBuf> requestPayload) {
try {
IOBufQueue inputBufQueue(IOBufQueue::cacheChainLength());
inputBufQueue.append(std::move(requestPayload));

CURL* curl = curl_easy_init();
if (!curl) {
VELOX_FAIL(fmt::format(
"Error initializing CURL: {}",
curl_easy_strerror(CURLE_FAILED_INIT)));
}

curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
curl_easy_setopt(curl, CURLOPT_POST, 1L);
curl_easy_setopt(curl, CURLOPT_READFUNCTION, readCallback);
curl_easy_setopt(curl, CURLOPT_READDATA, &inputBufQueue);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback);

IOBufQueue outputBuf(IOBufQueue::cacheChainLength());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &outputBuf);
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);

void RestClient::invoke_function(
std::unique_ptr<IOBuf> requestBody,
std::unique_ptr<IOBuf>& responseBody) const {
httpClient_->send(std::move(requestBody));
responseBody = httpClient_->getResponseBody();
};
struct curl_slist* headers = nullptr;
headers =
curl_slist_append(headers, "Content-Type: application/X-presto-pages");
headers = curl_slist_append(headers, "Accept: application/X-presto-pages");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);

curl_easy_setopt(
curl,
CURLOPT_POSTFIELDSIZE,
static_cast<long>(inputBufQueue.chainLength()));

CURLcode res = curl_easy_perform(curl);
if (res != CURLE_OK) {
curl_slist_free_all(headers);
curl_easy_cleanup(curl);
VELOX_FAIL(fmt::format(
"Error communicating with server: {}\nURL: {}\nCURL Error: {}",
curl_easy_strerror(res),
fullUrl.c_str(),
curl_easy_strerror(res)));
}

curl_slist_free_all(headers);
curl_easy_cleanup(curl);

return outputBuf.move();

} catch (const std::exception& e) {
VELOX_FAIL(fmt::format("Exception during CURL request: {}", e.what()));
}
}

std::unique_ptr<HttpClient> getRestClient() {
return std::make_unique<RestClient>();
}

} // namespace facebook::velox::functions
Loading

0 comments on commit 8609347

Please sign in to comment.