Skip to content

Commit

Permalink
Add a ThreadFactory class for creating threads (envoyproxy#5055)
Browse files Browse the repository at this point in the history
This enables injecting a custom thread factory into the envoy server

Signed-off-by: Elisha Ziskind <[email protected]>
  • Loading branch information
eziskind authored and mattklein123 committed Nov 20, 2018
1 parent 476468e commit 46e1411
Show file tree
Hide file tree
Showing 50 changed files with 238 additions and 139 deletions.
15 changes: 15 additions & 0 deletions include/envoy/thread/thread.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <functional>
#include <memory>

#include "envoy/common/pure.h"
Expand All @@ -21,6 +22,20 @@ class Thread {

typedef std::unique_ptr<Thread> ThreadPtr;

/**
* Interface providing a mechanism for creating threads.
*/
class ThreadFactory {
public:
virtual ~ThreadFactory() {}

/**
* Create a thread.
* @param thread_routine supplies the function to invoke in the thread.
*/
virtual ThreadPtr createThread(std::function<void()> thread_routine) PURE;
};

/**
* Like the C++11 "basic lockable concept" but a pure virtual interface vs. a template, and
* with thread annotations.
Expand Down
9 changes: 5 additions & 4 deletions source/common/api/api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
namespace Envoy {
namespace Api {

Impl::Impl(std::chrono::milliseconds file_flush_interval_msec,
Thread::ThreadFactory& thread_factory)
: file_flush_interval_msec_(file_flush_interval_msec), thread_factory_(thread_factory) {}

Event::DispatcherPtr Impl::allocateDispatcher(Event::TimeSystem& time_system) {
return Event::DispatcherPtr{new Event::DispatcherImpl(time_system)};
}

Impl::Impl(std::chrono::milliseconds file_flush_interval_msec)
: file_flush_interval_msec_(file_flush_interval_msec) {}

Filesystem::FileSharedPtr Impl::createFile(const std::string& path, Event::Dispatcher& dispatcher,
Thread::BasicLockable& lock, Stats::Store& stats_store) {
return std::make_shared<Filesystem::FileImpl>(path, dispatcher, lock, stats_store, *this,
Expand All @@ -28,7 +29,7 @@ bool Impl::fileExists(const std::string& path) { return Filesystem::fileExists(p
std::string Impl::fileReadToEnd(const std::string& path) { return Filesystem::fileReadToEnd(path); }

Thread::ThreadPtr Impl::createThread(std::function<void()> thread_routine) {
return std::make_unique<Thread::ThreadImpl>(thread_routine);
return thread_factory_.createThread(thread_routine);
}

} // namespace Api
Expand Down
4 changes: 3 additions & 1 deletion source/common/api/api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/api/api.h"
#include "envoy/event/timer.h"
#include "envoy/filesystem/filesystem.h"
#include "envoy/thread/thread.h"

namespace Envoy {
namespace Api {
Expand All @@ -15,7 +16,7 @@ namespace Api {
*/
class Impl : public Api::Api {
public:
Impl(std::chrono::milliseconds file_flush_interval_msec = std::chrono::milliseconds(1000));
Impl(std::chrono::milliseconds file_flush_interval_msec, Thread::ThreadFactory& thread_factory);

// Api::Api
Event::DispatcherPtr allocateDispatcher(Event::TimeSystem& time_system) override;
Expand All @@ -28,6 +29,7 @@ class Impl : public Api::Api {

private:
std::chrono::milliseconds file_flush_interval_msec_;
Thread::ThreadFactory& thread_factory_;
};

} // namespace Api
Expand Down
43 changes: 29 additions & 14 deletions source/common/common/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,35 @@
namespace Envoy {
namespace Thread {

ThreadImpl::ThreadImpl(std::function<void()> thread_routine) : thread_routine_(thread_routine) {
RELEASE_ASSERT(Logger::Registry::initialized(), "");
int rc = pthread_create(&thread_id_, nullptr,
[](void* arg) -> void* {
static_cast<ThreadImpl*>(arg)->thread_routine_();
return nullptr;
},
this);
RELEASE_ASSERT(rc == 0, "");
}

void ThreadImpl::join() {
int rc = pthread_join(thread_id_, nullptr);
RELEASE_ASSERT(rc == 0, "");
/**
* Wrapper for a pthread thread. We don't use std::thread because it eats exceptions and leads to
* unusable stack traces.
*/
class ThreadImpl : public Thread {
public:
ThreadImpl(std::function<void()> thread_routine) : thread_routine_(thread_routine) {
RELEASE_ASSERT(Logger::Registry::initialized(), "");
int rc = pthread_create(&thread_id_, nullptr,
[](void* arg) -> void* {
static_cast<ThreadImpl*>(arg)->thread_routine_();
return nullptr;
},
this);
RELEASE_ASSERT(rc == 0, "");
}

void join() override {
int rc = pthread_join(thread_id_, nullptr);
RELEASE_ASSERT(rc == 0, "");
}

private:
std::function<void()> thread_routine_;
pthread_t thread_id_;
};

ThreadPtr ThreadFactoryImpl::createThread(std::function<void()> thread_routine) {
return std::make_unique<ThreadImpl>(thread_routine);
}

int32_t currentThreadId() {
Expand Down
16 changes: 4 additions & 12 deletions source/common/common/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,13 @@ typedef int32_t ThreadId;
ThreadId currentThreadId();

/**
* Wrapper for a pthread thread. We don't use std::thread because it eats exceptions and leads to
* unusable stack traces.
* Implementation of ThreadFactory
*/
class ThreadImpl : public Thread {
class ThreadFactoryImpl : public ThreadFactory {
public:
ThreadImpl(std::function<void()> thread_routine);
ThreadFactoryImpl() {}

/**
* Join on thread exit.
*/
void join() override;

private:
std::function<void()> thread_routine_;
pthread_t thread_id_;
ThreadPtr createThread(std::function<void()> thread_routine) override;
};

/**
Expand Down
14 changes: 8 additions & 6 deletions source/exe/main_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ Runtime::LoaderPtr ProdComponentFactory::createRuntime(Server::Instance& server,

MainCommonBase::MainCommonBase(OptionsImpl& options, Event::TimeSystem& time_system,
TestHooks& test_hooks, Server::ComponentFactory& component_factory,
std::unique_ptr<Runtime::RandomGenerator>&& random_generator)
: options_(options), component_factory_(component_factory) {
std::unique_ptr<Runtime::RandomGenerator>&& random_generator,
Thread::ThreadFactory& thread_factory)
: options_(options), component_factory_(component_factory), thread_factory_(thread_factory) {
ares_library_init(ARES_LIB_INIT_ALL);
Event::Libevent::Global::initialize();
RELEASE_ASSERT(Envoy::Server::validateProtoDescriptors(), "");
Expand Down Expand Up @@ -74,7 +75,7 @@ MainCommonBase::MainCommonBase(OptionsImpl& options, Event::TimeSystem& time_sys

server_ = std::make_unique<Server::InstanceImpl>(
options_, time_system, local_address, test_hooks, *restarter_, *stats_store_,
access_log_lock, component_factory, std::move(random_generator), *tls_);
access_log_lock, component_factory, std::move(random_generator), *tls_, thread_factory);

break;
}
Expand Down Expand Up @@ -103,7 +104,7 @@ bool MainCommonBase::run() {
return true;
case Server::Mode::Validate: {
auto local_address = Network::Utility::getLocalAddress(options_.localAddressIpVersion());
return Server::validateConfig(options_, local_address, component_factory_);
return Server::validateConfig(options_, local_address, component_factory_, thread_factory_);
}
case Server::Mode::InitOnly:
PERF_DUMP();
Expand All @@ -127,7 +128,7 @@ void MainCommonBase::adminRequest(absl::string_view path_and_query, absl::string
MainCommon::MainCommon(int argc, const char* const* argv)
: options_(argc, argv, &MainCommon::hotRestartVersion, spdlog::level::info),
base_(options_, real_time_system_, default_test_hooks_, prod_component_factory_,
std::make_unique<Runtime::RandomGeneratorImpl>()) {}
std::make_unique<Runtime::RandomGeneratorImpl>(), thread_factory_) {}

std::string MainCommon::hotRestartVersion(uint64_t max_num_stats, uint64_t max_stat_name_len,
bool hot_restart_enabled) {
Expand All @@ -152,9 +153,10 @@ int main_common(OptionsImpl& options) {
Event::RealTimeSystem real_time_system_;
DefaultTestHooks default_test_hooks_;
ProdComponentFactory prod_component_factory_;
Thread::ThreadFactoryImpl thread_factory_;
MainCommonBase main_common(options, real_time_system_, default_test_hooks_,
prod_component_factory_,
std::make_unique<Runtime::RandomGeneratorImpl>());
std::make_unique<Runtime::RandomGeneratorImpl>(), thread_factory_);
return main_common.run() ? EXIT_SUCCESS : EXIT_FAILURE;
} catch (EnvoyException& e) {
return EXIT_FAILURE;
Expand Down
6 changes: 5 additions & 1 deletion source/exe/main_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/event/timer.h"
#include "envoy/runtime/runtime.h"

#include "common/common/thread.h"
#include "common/event/real_time_system.h"
#include "common/stats/thread_local_store.h"
#include "common/thread_local/thread_local_impl.h"
Expand Down Expand Up @@ -32,7 +33,8 @@ class MainCommonBase {
// destructed.
MainCommonBase(OptionsImpl& options, Event::TimeSystem& time_system, TestHooks& test_hooks,
Server::ComponentFactory& component_factory,
std::unique_ptr<Runtime::RandomGenerator>&& random_generator);
std::unique_ptr<Runtime::RandomGenerator>&& random_generator,
Thread::ThreadFactory& thread_factory);
~MainCommonBase();

bool run();
Expand Down Expand Up @@ -62,6 +64,7 @@ class MainCommonBase {
Envoy::OptionsImpl& options_;

Server::ComponentFactory& component_factory_;
Thread::ThreadFactory& thread_factory_;

std::unique_ptr<ThreadLocal::InstanceImpl> tls_;
std::unique_ptr<Server::HotRestart> restarter_;
Expand Down Expand Up @@ -107,6 +110,7 @@ class MainCommon {
Event::RealTimeSystem real_time_system_;
DefaultTestHooks default_test_hooks_;
ProdComponentFactory prod_component_factory_;
Thread::ThreadFactoryImpl thread_factory_;
MainCommonBase base_;
};

Expand Down
5 changes: 3 additions & 2 deletions source/server/config_validation/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
namespace Envoy {
namespace Api {

ValidationImpl::ValidationImpl(std::chrono::milliseconds file_flush_interval_msec)
: Impl(file_flush_interval_msec) {}
ValidationImpl::ValidationImpl(std::chrono::milliseconds file_flush_interval_msec,
Thread::ThreadFactory& thread_factory)
: Impl(file_flush_interval_msec, thread_factory) {}

Event::DispatcherPtr ValidationImpl::allocateDispatcher(Event::TimeSystem& time_system) {
return Event::DispatcherPtr{new Event::ValidationDispatcher(time_system)};
Expand Down
3 changes: 2 additions & 1 deletion source/server/config_validation/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace Api {
*/
class ValidationImpl : public Impl {
public:
ValidationImpl(std::chrono::milliseconds file_flush_interval_msec);
ValidationImpl(std::chrono::milliseconds file_flush_interval_msec,
Thread::ThreadFactory& thread_factory);

Event::DispatcherPtr allocateDispatcher(Event::TimeSystem&) override;
};
Expand Down
9 changes: 5 additions & 4 deletions source/server/config_validation/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ namespace Envoy {
namespace Server {

bool validateConfig(Options& options, Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory) {
ComponentFactory& component_factory, Thread::ThreadFactory& thread_factory) {
Thread::MutexBasicLockable access_log_lock;
Stats::IsolatedStoreImpl stats_store;

try {
Event::RealTimeSystem time_system;
ValidationInstance server(options, time_system, local_address, stats_store, access_log_lock,
component_factory);
component_factory, thread_factory);
std::cout << "configuration '" << options.configPath() << "' OK" << std::endl;
server.shutdown();
return true;
Expand All @@ -40,9 +40,10 @@ ValidationInstance::ValidationInstance(Options& options, Event::TimeSystem& time
Network::Address::InstanceConstSharedPtr local_address,
Stats::IsolatedStoreImpl& store,
Thread::BasicLockable& access_log_lock,
ComponentFactory& component_factory)
ComponentFactory& component_factory,
Thread::ThreadFactory& thread_factory)
: options_(options), time_system_(time_system), stats_store_(store),
api_(new Api::ValidationImpl(options.fileFlushIntervalMsec())),
api_(new Api::ValidationImpl(options.fileFlushIntervalMsec(), thread_factory)),
dispatcher_(api_->allocateDispatcher(time_system)),
singleton_manager_(new Singleton::ManagerImpl()),
access_log_manager_(*api_, *dispatcher_, access_log_lock, store), mutex_tracer_(nullptr) {
Expand Down
4 changes: 2 additions & 2 deletions source/server/config_validation/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace Server {
* the config is valid, false if invalid.
*/
bool validateConfig(Options& options, Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory);
ComponentFactory& component_factory, Thread::ThreadFactory& thread_factory);

/**
* ValidationInstance does the bulk of the work for config-validation runs of Envoy. It implements
Expand All @@ -56,7 +56,7 @@ class ValidationInstance : Logger::Loggable<Logger::Id::main>,
ValidationInstance(Options& options, Event::TimeSystem& time_system,
Network::Address::InstanceConstSharedPtr local_address,
Stats::IsolatedStoreImpl& store, Thread::BasicLockable& access_log_lock,
ComponentFactory& component_factory);
ComponentFactory& component_factory, Thread::ThreadFactory& thread_factory);

// Server::Instance
Admin& admin() override { return admin_; }
Expand Down
4 changes: 2 additions & 2 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ InstanceImpl::InstanceImpl(Options& options, Event::TimeSystem& time_system,
Thread::BasicLockable& access_log_lock,
ComponentFactory& component_factory,
Runtime::RandomGeneratorPtr&& random_generator,
ThreadLocal::Instance& tls)
ThreadLocal::Instance& tls, Thread::ThreadFactory& thread_factory)
: shutdown_(false), options_(options), time_system_(time_system), restarter_(restarter),
start_time_(time(nullptr)), original_start_time_(start_time_), stats_store_(store),
thread_local_(tls), api_(new Api::Impl(options.fileFlushIntervalMsec())),
thread_local_(tls), api_(new Api::Impl(options.fileFlushIntervalMsec(), thread_factory)),
secret_manager_(std::make_unique<Secret::SecretManagerImpl>()),
dispatcher_(api_->allocateDispatcher(time_system)),
singleton_manager_(new Singleton::ManagerImpl()),
Expand Down
3 changes: 2 additions & 1 deletion source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
Network::Address::InstanceConstSharedPtr local_address, TestHooks& hooks,
HotRestart& restarter, Stats::StoreRoot& store,
Thread::BasicLockable& access_log_lock, ComponentFactory& component_factory,
Runtime::RandomGeneratorPtr&& random_generator, ThreadLocal::Instance& tls);
Runtime::RandomGeneratorPtr&& random_generator, ThreadLocal::Instance& tls,
Thread::ThreadFactory& thread_factory);

~InstanceImpl() override;

Expand Down
1 change: 1 addition & 0 deletions test/common/api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ envoy_cc_test(
deps = [
"//source/common/api:api_lib",
"//test/test_common:environment_lib",
"//test/test_common:utility_lib",
],
)
5 changes: 3 additions & 2 deletions test/common/api/api_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
#include "common/api/api_impl.h"

#include "test/test_common/environment.h"
#include "test/test_common/utility.h"

#include "gtest/gtest.h"

namespace Envoy {
namespace Api {

TEST(ApiImplTest, readFileToEnd) {
Impl api(std::chrono::milliseconds(10000));
Impl api(std::chrono::milliseconds(1000), Thread::threadFactoryForTest());

const std::string data = "test read To End\nWith new lines.";
const std::string file_path = TestEnvironment::writeStringToFileForTest("test_api_envoy", data);
Expand All @@ -20,7 +21,7 @@ TEST(ApiImplTest, readFileToEnd) {
}

TEST(ApiImplTest, fileExists) {
Impl api(std::chrono::milliseconds(10000));
Impl api(std::chrono::milliseconds(1000), Thread::threadFactoryForTest());

EXPECT_TRUE(api.fileExists("/dev/null"));
EXPECT_FALSE(api.fileExists("/dev/blahblahblah"));
Expand Down
2 changes: 2 additions & 0 deletions test/common/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ envoy_cc_test(
"//source/common/event:dispatcher_lib",
"//test/mocks:common_lib",
"//test/test_common:test_time_lib",
"//test/test_common:utility_lib",
],
)

Expand Down Expand Up @@ -46,5 +47,6 @@ envoy_cc_test(
"//test/mocks/server:server_mocks",
"//test/mocks/stats:stats_mocks",
"//test/test_common:test_time_lib",
"//test/test_common:utility_lib",
],
)
Loading

0 comments on commit 46e1411

Please sign in to comment.