Skip to content

Commit

Permalink
feat: Create a local thrift service provider for RemoteFunction (#11538)
Browse files Browse the repository at this point in the history
Summary:

This diff is to extract the common logic related to starting RemoteFunctionService to a single place, so that we can later reuse it. 
- Create a Class RemoteFunctionServiceProvider and extract the logic related to starting a local thrift server for RemoteFunctionService there. 
- Create a Singleton remoteFunctionServiceProviderSingleton so that the server is only started once per process.
- Expose helper methods `getRemoteFunctionServiceParamsForLocalThrift()`
- Use this in RemoteFunctionTest.cpp

Reviewed By: pedroerp

Differential Revision: D65927269
  • Loading branch information
emilysun201309 authored and facebook-github-bot committed Dec 17, 2024
1 parent e46cb76 commit bdcaaf1
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 59 deletions.
1 change: 1 addition & 0 deletions velox/functions/remote/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
add_subdirectory(if)
add_subdirectory(client)
add_subdirectory(server)
add_subdirectory(utils)
1 change: 1 addition & 0 deletions velox/functions/remote/client/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ target_link_libraries(
velox_function_registry
velox_functions_test_lib
velox_exec_test_lib
velox_functions_remote_utils
GTest::gmock
GTest::gtest
GTest::gtest_main)
69 changes: 10 additions & 59 deletions velox/functions/remote/client/tests/RemoteFunctionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "velox/functions/remote/client/Remote.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionService.h"
#include "velox/functions/remote/server/RemoteFunctionService.h"
#include "velox/functions/remote/utils/RemoteFunctionServiceProvider.h"
#include "velox/serializers/PrestoSerializer.h"

using ::apache::thrift::ThriftServer;
Expand Down Expand Up @@ -73,19 +74,19 @@ class RemoteFunctionTest
public ::testing::WithParamInterface<remote::PageFormat> {
public:
void SetUp() override {
initializeServer();
registerRemoteFunctions();
auto params = startLocalThriftServiceAndGetParams();
registerRemoteFunctions(params);
}

void TearDown() override {
OpaqueType::clearSerializationRegistry();
}

// Registers a few remote functions to be used in this test.
void registerRemoteFunctions() {
void registerRemoteFunctions(RemoteFunctionServiceParams params) {
RemoteVectorFunctionMetadata metadata;
metadata.serdeFormat = GetParam();
metadata.location = location_;
metadata.location = params.serverAddress;

// Register the remote adapter.
auto plusSignatures = {exec::FunctionSignatureBuilder()
Expand Down Expand Up @@ -129,70 +130,20 @@ class RemoteFunctionTest
// Registers the actual function under a different prefix. This is only
// needed for tests since the thrift service runs in the same process.
registerFunction<PlusFunction, int64_t, int64_t, int64_t>(
{remotePrefix_ + ".remote_plus"});
{params.functionPrefix + ".remote_plus"});
registerFunction<FailFunction, UnknownValue, int32_t, Varchar>(
{remotePrefix_ + ".remote_fail"});
{params.functionPrefix + ".remote_fail"});
registerFunction<CheckedDivideFunction, double, double, double>(
{remotePrefix_ + ".remote_divide"});
{params.functionPrefix + ".remote_divide"});
registerFunction<SubstrFunction, Varchar, Varchar, int32_t>(
{remotePrefix_ + ".remote_substr"});
{params.functionPrefix + ".remote_substr"});
registerFunction<OpaqueTypeFunction, int64_t, std::shared_ptr<Foo>>(
{remotePrefix_ + ".remote_opaque"});
{params.functionPrefix + ".remote_opaque"});

registerOpaqueType<Foo>("Foo");
OpaqueType::registerSerialization<Foo>(
"Foo", Foo::serialize, Foo::deserialize);
}

void initializeServer() {
auto handler =
std::make_shared<RemoteFunctionServiceHandler>(remotePrefix_);
server_ = std::make_shared<ThriftServer>();
server_->setInterface(handler);
server_->setAddress(location_);

thread_ = std::make_unique<std::thread>([&] { server_->serve(); });
VELOX_CHECK(waitForRunning(), "Unable to initialize thrift server.");
LOG(INFO) << "Thrift server is up and running in local port " << location_;
}

~RemoteFunctionTest() {
server_->stop();
thread_->join();
LOG(INFO) << "Thrift server stopped.";
}

private:
// Loop until the server is up and running.
bool waitForRunning() {
for (size_t i = 0; i < 100; ++i) {
if (server_->getServerStatus() == ThriftServer::ServerStatus::RUNNING) {
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return false;
}

std::shared_ptr<apache::thrift::ThriftServer> server_;
std::unique_ptr<std::thread> thread_;

// Creates a random temporary file name to use to communicate as a unix domain
// socket.
folly::SocketAddress location_ = []() {
char name[] = "/tmp/socketXXXXXX";
int fd = mkstemp(name);
if (fd < 0) {
throw std::runtime_error("Failed to create temporary file for socket");
}
close(fd);
std::string socketPath(name);
// Cleanup existing socket file if it exists.
unlink(socketPath.c_str());
return folly::SocketAddress::makeFromPath(socketPath);
}();

const std::string remotePrefix_{"remote"};
};

TEST_P(RemoteFunctionTest, simple) {
Expand Down
19 changes: 19 additions & 0 deletions velox/functions/remote/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(velox_functions_remote_utils RemoteFunctionServiceProvider.cpp)

target_link_libraries(
velox_functions_remote_utils
PUBLIC velox_functions_remote_server)
74 changes: 74 additions & 0 deletions velox/functions/remote/utils/RemoteFunctionServiceProvider.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/functions/remote/utils/RemoteFunctionServiceProvider.h"

#include "thrift/lib/cpp2/server/ThriftServer.h"
#include "velox/functions/remote/server/RemoteFunctionService.h"

namespace facebook::velox::functions {

RemoteFunctionServiceParams
RemoteFunctionServiceProviderForLocalThrift::getRemoteFunctionServiceParams() {
folly::call_once(initializeServiceFlag_, [&]() { initializeServer(); });
return RemoteFunctionServiceParams{
remotePrefix_,
location_,
};
}

void RemoteFunctionServiceProviderForLocalThrift::initializeServer() {
auto handler =
std::make_shared<velox::functions::RemoteFunctionServiceHandler>(
remotePrefix_);
server_ = std::make_shared<apache::thrift::ThriftServer>();
server_->setInterface(handler);
server_->setAddress(location_);

thread_ = std::make_unique<std::thread>([&] { server_->serve(); });
VELOX_CHECK(waitForRunning(), "Unable to initialize thrift server.");
LOG(INFO) << "Thrift server is up and running in local port " << location_;
}

bool RemoteFunctionServiceProviderForLocalThrift::waitForRunning() {
for (size_t i = 0; i < 100; ++i) {
if (server_->getServerStatus() ==
apache::thrift::ThriftServer::ServerStatus::RUNNING) {
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return false;
}

RemoteFunctionServiceProviderForLocalThrift::
~RemoteFunctionServiceProviderForLocalThrift() {
server_->stop();
thread_->join();
LOG(INFO) << "Thrift server stopped.";
}

RemoteFunctionServiceParams startLocalThriftServiceAndGetParams() {
static folly::Singleton<IRemoteFunctionServiceProvider>
remoteFunctionServiceProviderForLocalThriftSingleton{
[]() { return new RemoteFunctionServiceProviderForLocalThrift(); }};
auto provider =
remoteFunctionServiceProviderForLocalThriftSingleton.try_get();
VELOX_CHECK(provider, "local remoteFunctionProvider is not available");
return provider->getRemoteFunctionServiceParams();
}

} // namespace facebook::velox::functions
91 changes: 91 additions & 0 deletions velox/functions/remote/utils/RemoteFunctionServiceProvider.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <string>
#include <thread>

#include <folly/SocketAddress.h>
#include <folly/synchronization/CallOnce.h>

namespace apache::thrift {
class ThriftServer;
} // namespace apache::thrift

namespace facebook::velox::functions {

constexpr std::string_view kRemoteFunctionPrefix = "remote";

struct RemoteFunctionServiceParams {
// Function prefix to be used for registering the actual functions.
// This is needed when server is running in the same process.
std::string functionPrefix;

// The socket address that the thrift server is running on.
folly::SocketAddress serverAddress;
};

class IRemoteFunctionServiceProvider {
public:
virtual ~IRemoteFunctionServiceProvider() = default;
virtual RemoteFunctionServiceParams getRemoteFunctionServiceParams() = 0;
};

class RemoteFunctionServiceProviderForLocalThrift
: public IRemoteFunctionServiceProvider {
public:
// Creates a thrift server that runs in a separate thread
// and returns the parameters of the service.
RemoteFunctionServiceParams getRemoteFunctionServiceParams() override;

~RemoteFunctionServiceProviderForLocalThrift() override;

private:
void initializeServer();

// Loop until the server is up and running.
bool waitForRunning();

std::shared_ptr<apache::thrift::ThriftServer> server_;
std::unique_ptr<std::thread> thread_;

// Creates a random temporary file name to use to communicate as a unix domain
// socket.
folly::SocketAddress location_ = []() {
char name[] = "/tmp/socketXXXXXX";
int fd = mkstemp(name);
if (fd < 0) {
throw std::runtime_error("Failed to create temporary file for socket");
}
close(fd);
std::string socketPath(name);
// Cleanup existing socket file if it exists.
unlink(socketPath.c_str());
return folly::SocketAddress::makeFromPath(socketPath);
}();

const std::string remotePrefix_{kRemoteFunctionPrefix};
folly::once_flag initializeServiceFlag_;
};

// If no thrift server is currently running, creates a thrift server
// that runs in a separate thread and returns the parameters of the service.
// If a thrift server is already running, returns the parameters of the service.
RemoteFunctionServiceParams startLocalThriftServiceAndGetParams();

} // namespace facebook::velox::functions

0 comments on commit bdcaaf1

Please sign in to comment.