Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc: Remote vs. Local Velox function benchmark #11539

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions velox/functions/remote/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@
add_subdirectory(if)
add_subdirectory(client)
add_subdirectory(server)
add_subdirectory(utils)
add_subdirectory(benchmarks)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarks must be enclosed under the VELOX_ENABLE_BENCHMARKS CMake option.
CC: @amitkdutta
@czentgr is opening a fix.

29 changes: 29 additions & 0 deletions velox/functions/remote/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_benchmark_local_remote_comparison
LocalRemoteComparisonBenchmark.cpp)

target_link_libraries(
velox_benchmark_local_remote_comparison
PUBLIC velox_type
velox_benchmark_builder
velox_vector_test_lib
velox_function_registry
velox_functions_remote
velox_functions_remote_utils
Folly::folly
Folly::follybenchmark
gflags::gflags
glog::glog)
125 changes: 125 additions & 0 deletions velox/functions/remote/benchmarks/LocalRemoteComparisonBenchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <folly/Benchmark.h>
#include <folly/init/Init.h>

#include "velox/benchmarks/ExpressionBenchmarkBuilder.h"
#include "velox/functions/Registerer.h"
#include "velox/functions/prestosql/Arithmetic.h"
#include "velox/functions/prestosql/StringFunctions.h"
#include "velox/functions/prestosql/URLFunctions.h"
#include "velox/functions/remote/client/Remote.h"
#include "velox/functions/remote/utils/RemoteFunctionServiceProvider.h"
#include "velox/type/Type.h"

using namespace facebook::velox;
using namespace facebook::velox::functions;

DEFINE_int32(batch_size, 1000, "Batch size for benchmarks");

int main(int argc, char** argv) {
folly::Init init(&argc, &argv);
memory::MemoryManager::initialize({});

auto param = startLocalThriftServiceAndGetParams();
RemoteVectorFunctionMetadata metadata;
metadata.location = param.serverAddress;

// Register the remote adapter for PlusFunction
auto plusSignatures = {exec::FunctionSignatureBuilder()
.returnType("bigint")
.argumentType("bigint")
.argumentType("bigint")
.build()};
registerRemoteFunction("remote_plus", plusSignatures, metadata);
// Registers the actual function under a different prefix. This is only
// needed when thrift service runs in the same process.
registerFunction<PlusFunction, int64_t, int64_t, int64_t>(
{param.functionPrefix + ".remote_plus"});
// register this function again, because the benchmark builder somehow doesn't
// recognize the function registered above (remote.xxx).
registerFunction<PlusFunction, int64_t, int64_t, int64_t>({"plus"});

// Register the remote adapter for SubstrFunction
auto substrSignatures = {exec::FunctionSignatureBuilder()
.returnType("varchar")
.argumentType("varchar")
.argumentType("integer")
.build()};
registerRemoteFunction("remote_substr", substrSignatures, metadata);
registerFunction<SubstrFunction, Varchar, Varchar, int32_t>(
{param.functionPrefix + ".remote_substr"});
registerFunction<SubstrFunction, Varchar, Varchar, int32_t>({"substr"});

// Register the remote adapter for UrlEncodeFunction
auto urlSignatures = {exec::FunctionSignatureBuilder()
.returnType("varchar")
.argumentType("varchar")
.build()};
registerRemoteFunction("remote_url_encode", urlSignatures, metadata);
registerFunction<UrlEncodeFunction, Varchar, Varchar>(
{param.functionPrefix + ".remote_url_encode"});
registerFunction<UrlEncodeFunction, Varchar, Varchar>({"url_encode"});

ExpressionBenchmarkBuilder benchmarkBuilder;

VectorFuzzer::Options opts;
opts.vectorSize = FLAGS_batch_size;
opts.stringVariableLength = true;
opts.containerVariableLength = true;
VectorFuzzer fuzzer(opts, benchmarkBuilder.pool());
auto vectorMaker = benchmarkBuilder.vectorMaker();

// benchmark comparaing PlusFunction running locally (same thread)
// and running with RemoteFunction (different threads).
benchmarkBuilder
.addBenchmarkSet(
"local_vs_remote_plus",
vectorMaker.rowVector(
{"c0", "c1"},
{fuzzer.fuzzFlat(BIGINT()), fuzzer.fuzzFlat(BIGINT())}))
.addExpression("local_plus", "plus(c0, c1) ")
.addExpression("remote_plus", "remote_plus(c0, c1) ")
.withIterations(1000);

// benchmark comparaing SubstrFunction running locally (same thread)
// and running with RemoteFunction (different threads).
benchmarkBuilder
.addBenchmarkSet(
"local_vs_remote_substr",
vectorMaker.rowVector(
{"c0", "c1"},
{fuzzer.fuzzFlat(VARCHAR()), fuzzer.fuzzFlat(INTEGER())}))
.addExpression("local_substr", "substr(c0, c1) ")
.addExpression("remote_substr", "remote_substr(c0, c1) ")
.withIterations(1000);

// benchmark comparaing UrlEncodeFunction running locally (same thread)
// and running with RemoteFunction (different threads).
benchmarkBuilder
.addBenchmarkSet(
"local_vs_remote_url_encode",
vectorMaker.rowVector({"c0"}, {fuzzer.fuzzFlat(VARCHAR())}))
.addExpression("local_url_encode", "url_encode(c0) ")
.addExpression("remote_url_encode", "remote_url_encode(c0) ")
.withIterations(1000);

benchmarkBuilder.registerBenchmarks();
benchmarkBuilder.testBenchmarks();
folly::runBenchmarks();
return 0;
}
1 change: 1 addition & 0 deletions velox/functions/remote/client/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ target_link_libraries(
velox_function_registry
velox_functions_test_lib
velox_exec_test_lib
velox_functions_remote_utils
GTest::gmock
GTest::gtest
GTest::gtest_main)
69 changes: 10 additions & 59 deletions velox/functions/remote/client/tests/RemoteFunctionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "velox/functions/remote/client/Remote.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionService.h"
#include "velox/functions/remote/server/RemoteFunctionService.h"
#include "velox/functions/remote/utils/RemoteFunctionServiceProvider.h"
#include "velox/serializers/PrestoSerializer.h"

using ::apache::thrift::ThriftServer;
Expand Down Expand Up @@ -73,19 +74,19 @@ class RemoteFunctionTest
public ::testing::WithParamInterface<remote::PageFormat> {
public:
void SetUp() override {
initializeServer();
registerRemoteFunctions();
auto params = startLocalThriftServiceAndGetParams();
registerRemoteFunctions(params);
}

void TearDown() override {
OpaqueType::clearSerializationRegistry();
}

// Registers a few remote functions to be used in this test.
void registerRemoteFunctions() {
void registerRemoteFunctions(RemoteFunctionServiceParams params) {
RemoteVectorFunctionMetadata metadata;
metadata.serdeFormat = GetParam();
metadata.location = location_;
metadata.location = params.serverAddress;

// Register the remote adapter.
auto plusSignatures = {exec::FunctionSignatureBuilder()
Expand Down Expand Up @@ -129,70 +130,20 @@ class RemoteFunctionTest
// Registers the actual function under a different prefix. This is only
// needed for tests since the thrift service runs in the same process.
registerFunction<PlusFunction, int64_t, int64_t, int64_t>(
{remotePrefix_ + ".remote_plus"});
{params.functionPrefix + ".remote_plus"});
registerFunction<FailFunction, UnknownValue, int32_t, Varchar>(
{remotePrefix_ + ".remote_fail"});
{params.functionPrefix + ".remote_fail"});
registerFunction<CheckedDivideFunction, double, double, double>(
{remotePrefix_ + ".remote_divide"});
{params.functionPrefix + ".remote_divide"});
registerFunction<SubstrFunction, Varchar, Varchar, int32_t>(
{remotePrefix_ + ".remote_substr"});
{params.functionPrefix + ".remote_substr"});
registerFunction<OpaqueTypeFunction, int64_t, std::shared_ptr<Foo>>(
{remotePrefix_ + ".remote_opaque"});
{params.functionPrefix + ".remote_opaque"});

registerOpaqueType<Foo>("Foo");
OpaqueType::registerSerialization<Foo>(
"Foo", Foo::serialize, Foo::deserialize);
}

void initializeServer() {
auto handler =
std::make_shared<RemoteFunctionServiceHandler>(remotePrefix_);
server_ = std::make_shared<ThriftServer>();
server_->setInterface(handler);
server_->setAddress(location_);

thread_ = std::make_unique<std::thread>([&] { server_->serve(); });
VELOX_CHECK(waitForRunning(), "Unable to initialize thrift server.");
LOG(INFO) << "Thrift server is up and running in local port " << location_;
}

~RemoteFunctionTest() {
server_->stop();
thread_->join();
LOG(INFO) << "Thrift server stopped.";
}

private:
// Loop until the server is up and running.
bool waitForRunning() {
for (size_t i = 0; i < 100; ++i) {
if (server_->getServerStatus() == ThriftServer::ServerStatus::RUNNING) {
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return false;
}

std::shared_ptr<apache::thrift::ThriftServer> server_;
std::unique_ptr<std::thread> thread_;

// Creates a random temporary file name to use to communicate as a unix domain
// socket.
folly::SocketAddress location_ = []() {
char name[] = "/tmp/socketXXXXXX";
int fd = mkstemp(name);
if (fd < 0) {
throw std::runtime_error("Failed to create temporary file for socket");
}
close(fd);
std::string socketPath(name);
// Cleanup existing socket file if it exists.
unlink(socketPath.c_str());
return folly::SocketAddress::makeFromPath(socketPath);
}();

const std::string remotePrefix_{"remote"};
};

TEST_P(RemoteFunctionTest, simple) {
Expand Down
19 changes: 19 additions & 0 deletions velox/functions/remote/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(velox_functions_remote_utils RemoteFunctionServiceProvider.cpp)

target_link_libraries(
velox_functions_remote_utils
PUBLIC velox_functions_remote_server)
76 changes: 76 additions & 0 deletions velox/functions/remote/utils/RemoteFunctionServiceProvider.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/functions/remote/utils/RemoteFunctionServiceProvider.h"

#include "thrift/lib/cpp2/server/ThriftServer.h"
#include "velox/functions/remote/server/RemoteFunctionService.h"

namespace facebook::velox::functions {

RemoteFunctionServiceParams
RemoteFunctionServiceProviderForLocalThrift::getRemoteFunctionServiceParams() {
folly::call_once(initializeServiceFlag_, [&]() { initializeServer(); });
return RemoteFunctionServiceParams{
remotePrefix_,
location_,
};
}

void RemoteFunctionServiceProviderForLocalThrift::initializeServer() {
auto handler =
std::make_shared<velox::functions::RemoteFunctionServiceHandler>(
remotePrefix_);
server_ = std::make_shared<apache::thrift::ThriftServer>();
server_->setInterface(handler);
server_->setAddress(location_);

thread_ = std::make_unique<std::thread>([&] { server_->serve(); });
VELOX_CHECK(waitForRunning(), "Unable to initialize thrift server.");
LOG(INFO) << "Thrift server is up and running in local port " << location_;
}

bool RemoteFunctionServiceProviderForLocalThrift::waitForRunning() {
for (size_t i = 0; i < 100; ++i) {
if (server_->getServerStatus() ==
apache::thrift::ThriftServer::ServerStatus::RUNNING) {
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return false;
}

RemoteFunctionServiceProviderForLocalThrift::
~RemoteFunctionServiceProviderForLocalThrift() {
server_->stop();
thread_->join();
LOG(INFO) << "Thrift server stopped.";
}

RemoteFunctionServiceParams startLocalThriftServiceAndGetParams() {
static folly::Singleton<IRemoteFunctionServiceProvider>
remoteFunctionServiceProviderForLocalThriftSingleton{
[]() { return new RemoteFunctionServiceProviderForLocalThrift(); }};
auto provider =
remoteFunctionServiceProviderForLocalThriftSingleton.try_get();
if (!provider) {
throw std::runtime_error("local remoteFunctionProvider is not available");
}
return provider->getRemoteFunctionServiceParams();
}

} // namespace facebook::velox::functions
Loading
Loading