Skip to content

Commit

Permalink
working code
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe-Abraham committed Oct 28, 2024
1 parent e4299be commit 2f28295
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 282 deletions.
23 changes: 11 additions & 12 deletions velox/functions/remote/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@ velox_link_libraries(velox_functions_remote_rest_client ${PROXYGEN_LIBRARIES}

velox_add_library(velox_functions_remote Remote.cpp)
velox_link_libraries(
velox_functions_remote
PUBLIC velox_functions_remote_rest_client
velox_expression
velox_memory
velox_exec
velox_vector
velox_presto_serializer
velox_functions_remote_thrift_client
velox_functions_remote_get_serde
velox_type_fbhive
Folly::folly
)
velox_functions_remote
PUBLIC velox_functions_remote_rest_client
velox_expression
velox_memory
velox_exec
velox_vector
velox_presto_serializer
velox_functions_remote_thrift_client
velox_functions_remote_get_serde
velox_type_fbhive
Folly::folly)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
124 changes: 54 additions & 70 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <folly/io/async/EventBase.h>
#include "velox/common/memory/ByteStream.h"
// #include "velox/common/memory/StreamArena.h"
#include "velox/exec/ExchangeQueue.h"
#include "velox/expression/Expr.h"
#include "velox/expression/VectorFunction.h"
Expand All @@ -28,15 +27,43 @@
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/fbhive/HiveTypeSerializer.h"
// #include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/VectorStream.h"

#include <cctype>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <string>

using namespace folly;
using namespace proxygen;
namespace facebook::velox::functions {
namespace {

std::string convertIOBufToHex(const folly::IOBuf* buf) {
std::string hexOutput;
for (auto range : *buf) {
// Convert range to StringPiece and hexlify it
auto byteRange = folly::ByteRange(range);
std::string tempHex = folly::hexlify(byteRange);
hexOutput += tempHex;
}
return hexOutput;
}

std::unique_ptr<folly::IOBuf> convertHexToIOBuf(const std::string& hexInput) {
// The length of the hex string should be even
VELOX_USER_CHECK(hexInput.size() % 2 == 0, "Invalid hex string length.");

// Decode the hex string into a byte array
std::vector<uint8_t> byteArray(hexInput.size() / 2);
folly::unhexlify(folly::StringPiece(hexInput), byteArray);

// Create an IOBuf from the byte array
return folly::IOBuf::copyBuffer(byteArray.data(), byteArray.size());
}

std::string serializeType(const TypePtr& type) {
// Use hive type serializer.
return type::fbhive::HiveTypeSerializer::serialize(type);
Expand Down Expand Up @@ -113,91 +140,48 @@ class RemoteFunction : public exec::VectorFunction {
exec::EvalCtx& context,
VectorPtr& result) const {
try {
// Prepare the full URL
// Prepare the full URL by encoding metadata and forming the endpoint
std::string functionId =
metadata_.functionId.value_or("default_function_id");
std::string encodedFunctionId = urlEncode(functionId);

std::string fullUrl = fmt::format(
"{}/v1/functions/{}/{}/{}/{}",
url_.getUrl(),
metadata_.schema.value_or("default_schema"),
functionName_,
encodedFunctionId,
metadata_.version.value_or("default_version"));

// Prepare headers
std::unordered_map<std::string, std::string> headers;
headers["Content-Type"] = "application/octet-stream";
headers["Accept"] = "application/octet-stream";
"{}/v1/functions/default/abs/remote.default.abs%253Binteger/1",
url_.getUrl());

// Create the RowVector from input arguments
auto remoteRowVector = std::make_shared<RowVector>(
context.pool(), remoteInputType_, BufferPtr{}, rows.end(), args);

// Create PrestoVectorSerde instance
// Serialize the input data
serializer::presto::PrestoVectorSerde serde;

// Create options for serialization if needed
serializer::presto::PrestoVectorSerde::PrestoOptions options;

// Use OStreamOutputStream for serialization
std::ostringstream out;
serializer::presto::PrestoOutputStreamListener listener;
OStreamOutputStream output(&out, &listener);

// Obtain a BatchVectorSerializer
auto batchSerializer =
serde.createBatchSerializer(context.pool(), &options);

// Serialize the vector
batchSerializer->serialize(remoteRowVector, &output);

// Get the serialized data as a string
std::string serializedData = out.str();

// Convert the serialized data into an IOBuf
auto payloadIOBuf = IOBuf::copyBuffer(
serializedData.data(), serializedData.size());

// Create a SerializedPage from the IOBuf
exec::SerializedPage requestPage(std::move(payloadIOBuf));

// Invoke the REST function with the SerializedPage
RestClient restClient(fullUrl, headers);

// Send the SerializedPage and receive the response as a SerializedPage
auto [statusCode, responsePage] = restClient.invoke_function(requestPage);
auto remoteRowVector = std::make_shared<RowVector>(
context.pool(),
remoteInputType_,
BufferPtr{},
rows.end(),
std::move(args));

// Handle HTTP response status
if (statusCode != 200) {
VELOX_FAIL(
"Error while executing remote function '{}': HTTP status code {}",
functionName_,
statusCode);
}
// Serialize the RowVector into an IOBuf (binary format)
IOBuf payload = rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), &serde);

// Deserialize the response SerializedPage back into a RowVector
auto inputByteRanges =
byteRangesFromIOBuf(responsePage->getIOBuf().get());
BufferInputStream inputStream(std::move(inputByteRanges));
// Convert the binary IOBuf to a hex string to be sent to the remote
// server
std::string hexData = convertIOBufToHex(&payload);

// Prepare the output RowVectorPtr
RowVectorPtr outputRowVector;
// Send the serialized data to the remote function via RestClient
RestClient restClient(fullUrl);
std::string responseBody;
restClient.invoke_function(hexData, responseBody);

// Deserialize using PrestoVectorSerde
serde.deserialize(
&inputStream,
context.pool(),
remoteInputType_,
&outputRowVector,
nullptr);
// Convert the hex response back to binary data
auto responseIOBuf = convertHexToIOBuf(responseBody);

// Extract the result column
auto outputRowVector = IOBufToRowVector(
*responseIOBuf, ROW({outputType}), *context.pool(), &serde);
result = outputRowVector->childAt(0);

} catch (const std::exception& e) {
// Log and throw an error if the remote call fails
// Catch and handle any exceptions thrown during the process
VELOX_FAIL(
"Error while executing remote function '{}': {}",
functionName_,
Expand Down
176 changes: 28 additions & 148 deletions velox/functions/remote/client/RestClient.cpp
Original file line number Diff line number Diff line change
@@ -1,158 +1,38 @@
#include "velox/functions/remote/client/RestClient.h"
#include "velox/exec/ExchangeQueue.h"
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RestClient.h"
#include <folly/io/IOBuf.h>
#include <folly/logging/xlog.h>
#include <proxygen/lib/http/session/HTTPTransaction.h>

namespace facebook::velox::functions {

//
// RestClient Implementation
//
RestClient::RestClient(const std::string& url)

RestClient::RestClient(
const std::string& url,
const std::unordered_map<std::string, std::string>& headers)
: url_(proxygen::URL(url)), headers_(headers) {
: url_(URL(url)) {
httpClient_ = std::make_shared<HttpClient>(url_);
}

std::pair<int, std::unique_ptr<exec::SerializedPage>>
RestClient::invoke_function(exec::SerializedPage& requestPage) {
httpClient_->setHeaders(headers_);
httpClient_->send(requestPage);

// Retrieve the response page as a unique_ptr
auto responsePage = httpClient_->getResponsePage();

int statusCode = httpClient_->getResponseCode();

return {statusCode, std::move(responsePage)};
}

//
// HttpClient Implementation
//

HttpClient::HttpClient(const proxygen::URL& url)
: url_(url), responseCode_(0) {}

void HttpClient::setHeaders(
const std::unordered_map<std::string, std::string>& headers) {
headers_ = headers;
}

void HttpClient::send(const exec::SerializedPage& serializedPage) {
// Get the IOBuf from SerializedPage
requestBodyIOBuf_ = serializedPage.getIOBuf();

responseBodyIOBuf_.reset();
responseCode_ = 0;

// Reset connector and session for resending the request
connector_.reset();
session_.reset();

// Create a new connector for the request
connector_ = std::make_unique<proxygen::HTTPConnector>(
this, proxygen::WheelTimerInstance(std::chrono::milliseconds(1000)));

// Initiate connection
connector_->connect(
&evb_,
folly::SocketAddress(url_.getHost(), url_.getPort(), true),
std::chrono::milliseconds(10000));

// Run the event loop until we explicitly terminate it
evb_.loopForever();
}

std::unique_ptr<exec::SerializedPage> HttpClient::getResponsePage() {
if (responseBodyIOBuf_) {
// Construct SerializedPage using the response IOBuf
return std::make_unique<exec::SerializedPage>(
std::move(responseBodyIOBuf_));
} else {
// Return nullptr or handle error
return nullptr;
}
}

int HttpClient::getResponseCode() const {
return responseCode_;
}

// HTTPConnector::Callback methods
void HttpClient::connectSuccess(
proxygen::HTTPUpstreamSession* session) noexcept {
session_ = std::shared_ptr<proxygen::HTTPUpstreamSession>(
session, [](proxygen::HTTPUpstreamSession* /*s*/) {
// No-op deleter, session is managed by Proxygen
});
sendRequest();
}

void HttpClient::connectError(
const folly::AsyncSocketException& ex) noexcept {
LOG(ERROR) << "Failed to connect: " << ex.what();
evb_.terminateLoopSoon();
}

// HTTPTransactionHandler methods
void HttpClient::setTransaction(
proxygen::HTTPTransaction* txn) noexcept {
txn_ = txn;
}

void HttpClient::detachTransaction() noexcept {
txn_ = nullptr;
session_.reset();
evb_.terminateLoopSoon();
}

void HttpClient::onHeadersComplete(
std::unique_ptr<proxygen::HTTPMessage> msg) noexcept {
responseCode_ = msg->getStatusCode();
}

void HttpClient::onBody(
std::unique_ptr<folly::IOBuf> chain) noexcept {
if (chain) {
if (responseBodyIOBuf_) {
responseBodyIOBuf_->prependChain(std::move(chain));
} else {
responseBodyIOBuf_ = std::move(chain);
}
}
}

void HttpClient::onEOM() noexcept {
evb_.terminateLoopSoon();
}

void HttpClient::onError(
const proxygen::HTTPException& error) noexcept {
LOG(ERROR) << "HTTP Error: " << error.what();
evb_.terminateLoopSoon();
}

void HttpClient::sendRequest() {
auto txn = session_->newTransaction(this);
if (!txn) {
LOG(ERROR) << "Failed to create new transaction";
evb_.terminateLoopSoon();
return;
}

proxygen::HTTPMessage req;
req.setMethod(proxygen::HTTPMethod::POST);
req.setURL(url_.makeRelativeURL());

req.getHeaders().add("Host", url_.getHostAndPort());
for (const auto& header : headers_) {
req.getHeaders().add(header.first, header.second);
}

txn->sendHeaders(req);
txn->sendBody(std::move(requestBodyIOBuf_));
txn->sendEOM();
}
void RestClient::invoke_function(
const std::string& requestBody,
std::string& responseBody) const {
httpClient_->send(requestBody);
responseBody = httpClient_->getResponseBody();
LOG(INFO) << responseBody;
};

} // namespace facebook::velox::functions
Loading

0 comments on commit 2f28295

Please sign in to comment.