diff --git a/velox/functions/remote/CMakeLists.txt b/velox/functions/remote/CMakeLists.txt index ccc8a2c5ec48..90b8928986e9 100644 --- a/velox/functions/remote/CMakeLists.txt +++ b/velox/functions/remote/CMakeLists.txt @@ -15,3 +15,4 @@ add_subdirectory(if) add_subdirectory(client) add_subdirectory(server) +add_subdirectory(utils) diff --git a/velox/functions/remote/client/tests/CMakeLists.txt b/velox/functions/remote/client/tests/CMakeLists.txt index 1659ad9d7e5a..15c8c6e00ebd 100644 --- a/velox/functions/remote/client/tests/CMakeLists.txt +++ b/velox/functions/remote/client/tests/CMakeLists.txt @@ -24,6 +24,7 @@ target_link_libraries( velox_function_registry velox_functions_test_lib velox_exec_test_lib + velox_functions_remote_utils GTest::gmock GTest::gtest GTest::gtest_main) diff --git a/velox/functions/remote/client/tests/RemoteFunctionTest.cpp b/velox/functions/remote/client/tests/RemoteFunctionTest.cpp index cc49b8f71ccf..af82db0810d1 100644 --- a/velox/functions/remote/client/tests/RemoteFunctionTest.cpp +++ b/velox/functions/remote/client/tests/RemoteFunctionTest.cpp @@ -31,6 +31,7 @@ #include "velox/functions/remote/client/Remote.h" #include "velox/functions/remote/if/gen-cpp2/RemoteFunctionService.h" #include "velox/functions/remote/server/RemoteFunctionService.h" +#include "velox/functions/remote/utils/RemoteFunctionServiceProvider.h" #include "velox/serializers/PrestoSerializer.h" using ::apache::thrift::ThriftServer; @@ -73,8 +74,8 @@ class RemoteFunctionTest public ::testing::WithParamInterface { public: void SetUp() override { - initializeServer(); - registerRemoteFunctions(); + auto params = startLocalThriftServiceAndGetParams(); + registerRemoteFunctions(params); } void TearDown() override { @@ -82,10 +83,10 @@ class RemoteFunctionTest } // Registers a few remote functions to be used in this test. - void registerRemoteFunctions() { + void registerRemoteFunctions(RemoteFunctionServiceParams params) { RemoteVectorFunctionMetadata metadata; metadata.serdeFormat = GetParam(); - metadata.location = location_; + metadata.location = params.serverAddress; // Register the remote adapter. auto plusSignatures = {exec::FunctionSignatureBuilder() @@ -129,70 +130,20 @@ class RemoteFunctionTest // Registers the actual function under a different prefix. This is only // needed for tests since the thrift service runs in the same process. registerFunction( - {remotePrefix_ + ".remote_plus"}); + {params.functionPrefix + ".remote_plus"}); registerFunction( - {remotePrefix_ + ".remote_fail"}); + {params.functionPrefix + ".remote_fail"}); registerFunction( - {remotePrefix_ + ".remote_divide"}); + {params.functionPrefix + ".remote_divide"}); registerFunction( - {remotePrefix_ + ".remote_substr"}); + {params.functionPrefix + ".remote_substr"}); registerFunction>( - {remotePrefix_ + ".remote_opaque"}); + {params.functionPrefix + ".remote_opaque"}); registerOpaqueType("Foo"); OpaqueType::registerSerialization( "Foo", Foo::serialize, Foo::deserialize); } - - void initializeServer() { - auto handler = - std::make_shared(remotePrefix_); - server_ = std::make_shared(); - server_->setInterface(handler); - server_->setAddress(location_); - - thread_ = std::make_unique([&] { server_->serve(); }); - VELOX_CHECK(waitForRunning(), "Unable to initialize thrift server."); - LOG(INFO) << "Thrift server is up and running in local port " << location_; - } - - ~RemoteFunctionTest() { - server_->stop(); - thread_->join(); - LOG(INFO) << "Thrift server stopped."; - } - - private: - // Loop until the server is up and running. - bool waitForRunning() { - for (size_t i = 0; i < 100; ++i) { - if (server_->getServerStatus() == ThriftServer::ServerStatus::RUNNING) { - return true; - } - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - return false; - } - - std::shared_ptr server_; - std::unique_ptr thread_; - - // Creates a random temporary file name to use to communicate as a unix domain - // socket. - folly::SocketAddress location_ = []() { - char name[] = "/tmp/socketXXXXXX"; - int fd = mkstemp(name); - if (fd < 0) { - throw std::runtime_error("Failed to create temporary file for socket"); - } - close(fd); - std::string socketPath(name); - // Cleanup existing socket file if it exists. - unlink(socketPath.c_str()); - return folly::SocketAddress::makeFromPath(socketPath); - }(); - - const std::string remotePrefix_{"remote"}; }; TEST_P(RemoteFunctionTest, simple) { diff --git a/velox/functions/remote/utils/CMakeLists.txt b/velox/functions/remote/utils/CMakeLists.txt new file mode 100644 index 000000000000..51192297cf0c --- /dev/null +++ b/velox/functions/remote/utils/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library(velox_functions_remote_utils RemoteFunctionServiceProvider.cpp) + +target_link_libraries( + velox_functions_remote_utils + PUBLIC velox_functions_remote_server) diff --git a/velox/functions/remote/utils/RemoteFunctionServiceProvider.cpp b/velox/functions/remote/utils/RemoteFunctionServiceProvider.cpp new file mode 100644 index 000000000000..290dce274997 --- /dev/null +++ b/velox/functions/remote/utils/RemoteFunctionServiceProvider.cpp @@ -0,0 +1,76 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/functions/remote/utils/RemoteFunctionServiceProvider.h" + +#include "thrift/lib/cpp2/server/ThriftServer.h" +#include "velox/functions/remote/server/RemoteFunctionService.h" + +namespace facebook::velox::functions { + +RemoteFunctionServiceParams +RemoteFunctionServiceProviderForLocalThrift::getRemoteFunctionServiceParams() { + folly::call_once(initializeServiceFlag_, [&]() { initializeServer(); }); + return RemoteFunctionServiceParams{ + remotePrefix_, + location_, + }; +} + +void RemoteFunctionServiceProviderForLocalThrift::initializeServer() { + auto handler = + std::make_shared( + remotePrefix_); + server_ = std::make_shared(); + server_->setInterface(handler); + server_->setAddress(location_); + + thread_ = std::make_unique([&] { server_->serve(); }); + VELOX_CHECK(waitForRunning(), "Unable to initialize thrift server."); + LOG(INFO) << "Thrift server is up and running in local port " << location_; +} + +bool RemoteFunctionServiceProviderForLocalThrift::waitForRunning() { + for (size_t i = 0; i < 100; ++i) { + if (server_->getServerStatus() == + apache::thrift::ThriftServer::ServerStatus::RUNNING) { + return true; + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + return false; +} + +RemoteFunctionServiceProviderForLocalThrift:: + ~RemoteFunctionServiceProviderForLocalThrift() { + server_->stop(); + thread_->join(); + LOG(INFO) << "Thrift server stopped."; +} + +RemoteFunctionServiceParams startLocalThriftServiceAndGetParams() { + static folly::Singleton + remoteFunctionServiceProviderForLocalThriftSingleton{ + []() { return new RemoteFunctionServiceProviderForLocalThrift(); }}; + auto provider = + remoteFunctionServiceProviderForLocalThriftSingleton.try_get(); + if (!provider) { + throw std::runtime_error("local remoteFunctionProvider is not available"); + } + return provider->getRemoteFunctionServiceParams(); +} + +} // namespace facebook::velox::functions diff --git a/velox/functions/remote/utils/RemoteFunctionServiceProvider.h b/velox/functions/remote/utils/RemoteFunctionServiceProvider.h new file mode 100644 index 000000000000..5646b7b7076a --- /dev/null +++ b/velox/functions/remote/utils/RemoteFunctionServiceProvider.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +namespace apache::thrift { +class ThriftServer; +} // namespace apache::thrift + +namespace facebook::velox::functions { + +constexpr std::string_view kRemoteFunctionPrefix = "remote"; + +struct RemoteFunctionServiceParams { + // Function prefix to be used for registering the actual functions. + // This is needed when server is running in the same process. + std::string functionPrefix; + + // The socket address that the thrift server is running on. + folly::SocketAddress serverAddress; +}; + +class IRemoteFunctionServiceProvider { + public: + virtual ~IRemoteFunctionServiceProvider() = default; + virtual RemoteFunctionServiceParams getRemoteFunctionServiceParams() = 0; +}; + +class RemoteFunctionServiceProviderForLocalThrift + : public IRemoteFunctionServiceProvider { + public: + // Creates a thrift server that runs in a separate thread + // and returns the parameters of the service. + RemoteFunctionServiceParams getRemoteFunctionServiceParams() override; + + ~RemoteFunctionServiceProviderForLocalThrift() override; + + private: + void initializeServer(); + + // Loop until the server is up and running. + bool waitForRunning(); + + std::shared_ptr server_; + std::unique_ptr thread_; + + // Creates a random temporary file name to use to communicate as a unix domain + // socket. + folly::SocketAddress location_ = []() { + char name[] = "/tmp/socketXXXXXX"; + int fd = mkstemp(name); + if (fd < 0) { + throw std::runtime_error("Failed to create temporary file for socket"); + } + close(fd); + std::string socketPath(name); + // Cleanup existing socket file if it exists. + unlink(socketPath.c_str()); + return folly::SocketAddress::makeFromPath(socketPath); + }(); + + const std::string remotePrefix_{kRemoteFunctionPrefix}; + folly::once_flag initializeServiceFlag_; +}; + +// If no thrift server is currently running, creates a thrift server +// that runs in a separate thread and returns the parameters of the service. +// If a thrift server is already running, returns the parameters of the service. +RemoteFunctionServiceParams startLocalThriftServiceAndGetParams(); + +} // namespace facebook::velox::functions