Skip to content

Commit

Permalink
review points fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe-Abraham committed Dec 3, 2024
1 parent 5f6bcdb commit 9d0787d
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 101 deletions.
7 changes: 3 additions & 4 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@
#include <folly/io/async/EventBase.h>
#include <sstream>
#include <string>

#include "velox/common/memory/ByteStream.h"
#include "velox/expression/Expr.h"
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/client/RestClient.h"
#include "velox/functions/remote/client/ThriftClient.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/fbhive/HiveTypeSerializer.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/VectorStream.h"

#include "velox/functions/remote/client/RestClient.h"

using namespace folly;
namespace facebook::velox::functions {
namespace {
Expand Down Expand Up @@ -140,7 +139,7 @@ class RemoteFunction : public exec::VectorFunction {
metadata_.version.value_or("1"));

std::unique_ptr<IOBuf> responseBody =
restClient_->performCurlRequest(fullUrl, std::move(requestBody));
restClient_->invokeFunction(fullUrl, std::move(requestBody));

auto outputRowVector = IOBufToRowVector(
*responseBody, ROW({outputType}), *context.pool(), &serde);
Expand Down
6 changes: 3 additions & 3 deletions velox/functions/remote/client/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace facebook::velox::functions {

struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata {
/// URL of the HTTP/REST server for remote function.
/// Or Network address of the servr to communicate with. Note that this can
/// Or Network address of the server to communicate with. Note that this can
/// hold a network location (ip/port pair) or a unix domain socket path (see
/// SocketAddress::makeFromPath()).
boost::variant<folly::SocketAddress, std::string> location;
Expand Down Expand Up @@ -55,8 +55,8 @@ struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata {
//
/// Remote functions are registered as regular statufull functions (using the
/// same internal catalog), and hence conflict if there already exists a
/// (non-remote) function registered with the same name. The `overwrite`
/// flagwrite controls whether to overwrite in these cases.
/// (non-remote) function registered with the same name. The `overwrite` flag
/// controls whether to overwrite in these cases.
void registerRemoteFunction(
const std::string& name,
std::vector<exec::FunctionSignaturePtr> signatures,
Expand Down
12 changes: 6 additions & 6 deletions velox/functions/remote/client/RestClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ size_t readCallback(char* dest, size_t size, size_t nmemb, void* userp) {

return totalCopied;
}
size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userdata) {
auto* outputBuf = static_cast<IOBufQueue*>(userdata);
size_t total_size = size * nmemb;
auto buf = IOBuf::copyBuffer(ptr, total_size);
size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userData) {
auto* outputBuf = static_cast<IOBufQueue*>(userData);
size_t totalSize = size * nmemb;
auto buf = IOBuf::copyBuffer(ptr, totalSize);
outputBuf->append(std::move(buf));
return total_size;
return totalSize;
}
} // namespace

std::unique_ptr<IOBuf> RestClient::performCurlRequest(
std::unique_ptr<IOBuf> RestClient::invokeFunction(
const std::string& fullUrl,
std::unique_ptr<IOBuf> requestPayload) {
try {
Expand Down
4 changes: 2 additions & 2 deletions velox/functions/remote/client/RestClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ class HttpClient {
public:
virtual ~HttpClient() = default;

virtual std::unique_ptr<folly::IOBuf> performCurlRequest(
virtual std::unique_ptr<folly::IOBuf> invokeFunction(
const std::string& url,
std::unique_ptr<folly::IOBuf> requestPayload) = 0;
};

class RestClient : public HttpClient {
public:
std::unique_ptr<folly::IOBuf> performCurlRequest(
std::unique_ptr<folly::IOBuf> invokeFunction(
const std::string& url,
std::unique_ptr<folly::IOBuf> requestPayload) override;
};
Expand Down
67 changes: 67 additions & 0 deletions velox/functions/remote/server/RemoteFunctionHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <string>
#include <vector>
#include "velox/expression/Expr.h"
#include "velox/type/Type.h"
#include "velox/type/fbhive/HiveTypeParser.h"

namespace facebook::velox::functions {
std::string getFunctionName(
const std::string& prefix,
const std::string& functionName) {
return prefix.empty() ? functionName
: fmt::format("{}.{}", prefix, functionName);
}

TypePtr deserializeType(const std::string& input) {
// Use hive type parser/serializer.
return type::fbhive::HiveTypeParser().parse(input);
}

RowTypePtr deserializeArgTypes(const std::vector<std::string>& argTypes) {
const size_t argCount = argTypes.size();

std::vector<TypePtr> argumentTypes;
std::vector<std::string> typeNames;
argumentTypes.reserve(argCount);
typeNames.reserve(argCount);

for (size_t i = 0; i < argCount; ++i) {
argumentTypes.emplace_back(deserializeType(argTypes[i]));
typeNames.emplace_back(fmt::format("c{}", i));
}
return ROW(std::move(typeNames), std::move(argumentTypes));
}

std::vector<core::TypedExprPtr> getExpressions(
const RowTypePtr& inputType,
const TypePtr& returnType,
const std::string& functionName) {
std::vector<core::TypedExprPtr> inputs;
for (size_t i = 0; i < inputType->size(); ++i) {
inputs.push_back(std::make_shared<core::FieldAccessTypedExpr>(
inputType->childAt(i), inputType->nameOf(i)));
}

return {std::make_shared<core::CallTypedExpr>(
returnType, std::move(inputs), functionName)};
}

} // namespace facebook::velox::functions
43 changes: 1 addition & 42 deletions velox/functions/remote/server/RemoteFunctionRestService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <boost/beast/version.hpp>
#include <serializers/PrestoSerializer.h>
#include "velox/expression/Expr.h"
#include "velox/type/fbhive/HiveTypeParser.h"
#include "velox/functions/remote/server/RemoteFunctionHelper.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::functions {
Expand All @@ -41,47 +41,6 @@ std::map<std::string, InternalFunctionSignature> internalFunctionSignatureMap =
// called to use the functions mentioned in this map
};

TypePtr deserializeType(const std::string& input) {
// Use hive type parser/serializer.
return type::fbhive::HiveTypeParser().parse(input);
}

RowTypePtr deserializeArgTypes(const std::vector<std::string>& argTypes) {
const size_t argCount = argTypes.size();

std::vector<TypePtr> argumentTypes;
std::vector<std::string> typeNames;
argumentTypes.reserve(argCount);
typeNames.reserve(argCount);

for (size_t i = 0; i < argCount; ++i) {
argumentTypes.emplace_back(deserializeType(argTypes[i]));
typeNames.emplace_back(fmt::format("c{}", i));
}
return ROW(std::move(typeNames), std::move(argumentTypes));
}

std::vector<core::TypedExprPtr> getExpressions(
const RowTypePtr& inputType,
const TypePtr& returnType,
const std::string& functionName) {
std::vector<core::TypedExprPtr> inputs;
for (size_t i = 0; i < inputType->size(); ++i) {
inputs.push_back(std::make_shared<core::FieldAccessTypedExpr>(
inputType->childAt(i), inputType->nameOf(i)));
}

return {std::make_shared<core::CallTypedExpr>(
returnType, std::move(inputs), functionName)};
}

std::string getFunctionName(
const std::string& prefix,
const std::string& functionName) {
return prefix.empty() ? functionName
: fmt::format("{}.{}", prefix, functionName);
}

} // namespace

session::session(
Expand Down
45 changes: 1 addition & 44 deletions velox/functions/remote/server/RemoteFunctionService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,11 @@
#include "velox/common/base/Exceptions.h"
#include "velox/expression/Expr.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/functions/remote/server/RemoteFunctionHelper.h"
#include "velox/type/fbhive/HiveTypeParser.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::functions {
namespace {

std::string getFunctionName(
const std::string& prefix,
const std::string& functionName) {
return prefix.empty() ? functionName
: fmt::format("{}.{}", prefix, functionName);
}

TypePtr deserializeType(const std::string& input) {
// Use hive type parser/serializer.
return type::fbhive::HiveTypeParser().parse(input);
}

RowTypePtr deserializeArgTypes(const std::vector<std::string>& argTypes) {
const size_t argCount = argTypes.size();

std::vector<TypePtr> argumentTypes;
std::vector<std::string> typeNames;
argumentTypes.reserve(argCount);
typeNames.reserve(argCount);

for (size_t i = 0; i < argCount; ++i) {
argumentTypes.emplace_back(deserializeType(argTypes[i]));
typeNames.emplace_back(fmt::format("c{}", i));
}
return ROW(std::move(typeNames), std::move(argumentTypes));
}

} // namespace

std::vector<core::TypedExprPtr> getExpressions(
const RowTypePtr& inputType,
const TypePtr& returnType,
const std::string& functionName) {
std::vector<core::TypedExprPtr> inputs;
for (size_t i = 0; i < inputType->size(); ++i) {
inputs.push_back(std::make_shared<core::FieldAccessTypedExpr>(
inputType->childAt(i), inputType->nameOf(i)));
}

return {std::make_shared<core::CallTypedExpr>(
returnType, std::move(inputs), functionName)};
}

void RemoteFunctionServiceHandler::handleErrors(
apache::thrift::field_ref<remote::RemoteFunctionPage&> result,
Expand Down

0 comments on commit 9d0787d

Please sign in to comment.