Skip to content

Commit

Permalink
A large number of performance changes (related to faasm#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
eigenraven committed Oct 15, 2021
1 parent 5034dec commit 3518406
Show file tree
Hide file tree
Showing 37 changed files with 955 additions and 297 deletions.
5 changes: 3 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ add_dependencies(faabric pistache_ext spdlog_ext)
target_link_libraries(faabric PUBLIC
faabricmpi
hiredis
boost_system
boost_filesystem
Boost::system
Boost::filesystem
Boost::Boost
zstd::libzstd_static
${PISTACHE_LIBRARY}
${PROTOBUF_LIBRARY}
Expand Down
39 changes: 38 additions & 1 deletion cmake/ExternalProjects.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ include(FindGit)
find_package(Git)
include (ExternalProject)
include (FetchContent)
find_package (Threads REQUIRED)

# Protobuf
set(PROTOBUF_LIBRARY ${CMAKE_INSTALL_PREFIX}/lib/libprotobuf.so)
Expand Down Expand Up @@ -116,6 +117,42 @@ FetchContent_MakeAvailable(zstd_ext)
target_include_directories(libzstd_static INTERFACE $<BUILD_INTERFACE:${zstd_ext_SOURCE_DIR}/lib>)
add_library(zstd::libzstd_static ALIAS libzstd_static)

# Boost libraries, the header-only ones
FetchContent_Declare(boost_ext
URL "https://boostorg.jfrog.io/artifactory/main/release/1.77.0/source/boost_1_77_0.tar.bz2"
URL_HASH "SHA256=fc9f85fc030e233142908241af7a846e60630aa7388de9a5fafb1f3a26840854"
)
FetchContent_GetProperties(boost_ext)
if(NOT boost_ext_POPULATED)
FetchContent_Populate(boost_ext)
endif()
add_library(Boost INTERFACE)
target_compile_definitions(Boost INTERFACE
BOOST_BEAST_USE_STD_STRING_VIEW
BOOST_ASIO_NO_DEPRECATED
BOOST_ASIO_NO_TS_EXECUTORS
BOOST_ASIO_NO_DEFAULT_LINKED_LIBS
)
target_include_directories(Boost INTERFACE ${boost_ext_SOURCE_DIR})
target_link_libraries(Boost INTERFACE Threads::Threads)
target_compile_features(Boost INTERFACE cxx_std_17)
add_library(Boost::Boost ALIAS Boost)
# Header-only aliases
add_library(Boost::atomic ALIAS Boost)
add_library(Boost::core ALIAS Boost)
add_library(Boost::assert ALIAS Boost)
add_library(Boost::config ALIAS Boost)
add_library(Boost::container_hash ALIAS Boost)
add_library(Boost::detail ALIAS Boost)
add_library(Boost::io ALIAS Boost)
add_library(Boost::iterator ALIAS Boost)
add_library(Boost::smart_ptr ALIAS Boost)
add_library(Boost::system ALIAS Boost)
add_library(Boost::type_traits ALIAS Boost)
add_library(Boost::predef ALIAS Boost)
add_library(Boost::Boost ALIAS Boost)
add_subdirectory(${boost_ext_SOURCE_DIR}/libs/filesystem ${boost_ext_BINARY_DIR}/libs/filesystem EXCLUDE_FROM_ALL)

# ZeroMQ
set(ZEROMQ_LIBRARY ${CMAKE_INSTALL_PREFIX}/lib/libzmq.so)
ExternalProject_Add(libzeromq_ext
Expand All @@ -129,7 +166,7 @@ ExternalProject_Get_Property(libzeromq_ext SOURCE_DIR)
set(LIBZEROMQ_INCLUDE_DIR ${SOURCE_DIR})
ExternalProject_Add(cppzeromq_ext
GIT_REPOSITORY "https://github.com/zeromq/cppzmq.git"
GIT_TAG "v4.7.1"
GIT_TAG "v4.8.0"
CMAKE_CACHE_ARGS "-DCPPZMQ_BUILD_TESTS:BOOL=OFF"
"-DCMAKE_INSTALL_PREFIX:STRING=${CMAKE_INSTALL_PREFIX}"
)
Expand Down
4 changes: 2 additions & 2 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ function(add_example example_name)
${FAABRIC_LIB_DIR}/libprotobuf.so
${FAABRIC_LIB_DIR}/libpistache.so
${FAABRIC_LIB_DIR}/libzmq.so
boost_system
boost_filesystem
Boost::system
Boost::filesystem
hiredis
pthread
)
Expand Down
10 changes: 8 additions & 2 deletions examples/server.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include <faabric/endpoint/FaabricEndpoint.h>
#include <faabric/endpoint/Endpoint.h>
#include <faabric/endpoint/FaabricEndpointHandler.h>
#include <faabric/runner/FaabricMain.h>
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/transport/context.h>
#include <faabric/util/config.h>
#include <faabric/util/logging.h>

using namespace faabric::scheduler;
Expand Down Expand Up @@ -50,7 +52,11 @@ int main()

// Start endpoint (will also have multiple threads)
SPDLOG_INFO("Starting endpoint");
faabric::endpoint::FaabricEndpoint endpoint;
const auto& config = faabric::util::getSystemConfig();
faabric::endpoint::Endpoint endpoint(
config.endpointPort,
config.endpointNumThreads,
std::make_shared<faabric::endpoint::FaabricEndpointHandler>());
endpoint.start();

SPDLOG_INFO("Shutting down endpoint");
Expand Down
45 changes: 35 additions & 10 deletions include/faabric/endpoint/Endpoint.h
Original file line number Diff line number Diff line change
@@ -1,28 +1,53 @@
#pragma once

#include <functional>
#include <memory>

#include <faabric/proto/faabric.pb.h>
#include <faabric/util/asio.h>
#include <faabric/util/config.h>
#include <pistache/endpoint.h>
#include <pistache/http.h>

namespace faabric::endpoint {
namespace detail {
struct EndpointState;
}

struct HttpRequestContext
{
asio::io_context& ioc;
asio::any_io_executor executor;
std::function<void(faabric::util::BeastHttpResponse&&)> sendFunction;
};

class HttpRequestHandler
{
public:
virtual void onRequest(HttpRequestContext&& ctx,
faabric::util::BeastHttpRequest&& request) = 0;
};

class Endpoint
{
public:
Endpoint();
Endpoint() = delete;
Endpoint(const Endpoint&) = delete;
Endpoint(Endpoint&&) = delete;
Endpoint& operator=(const Endpoint&) = delete;
Endpoint& operator=(Endpoint&&) = delete;
virtual ~Endpoint();

Endpoint(int port, int threadCount);
Endpoint(int port,
int threadCount,
std::shared_ptr<HttpRequestHandler> requestHandlerIn);

void start(bool awaitSignal = true);

void stop();

virtual std::shared_ptr<Pistache::Http::Handler> getHandler() = 0;

private:
int port = faabric::util::getSystemConfig().endpointPort;
int threadCount = faabric::util::getSystemConfig().endpointNumThreads;

Pistache::Http::Endpoint httpEndpoint;
int port;
int threadCount;
std::unique_ptr<detail::EndpointState> state;
std::shared_ptr<HttpRequestHandler> requestHandler;
};
}
16 changes: 0 additions & 16 deletions include/faabric/endpoint/FaabricEndpoint.h

This file was deleted.

25 changes: 13 additions & 12 deletions include/faabric/endpoint/FaabricEndpointHandler.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
#pragma once

#include <faabric/endpoint/Endpoint.h>
#include <faabric/proto/faabric.pb.h>
#include <pistache/http.h>

namespace faabric::endpoint {
class FaabricEndpointHandler : public Pistache::Http::Handler
class FaabricEndpointHandler final
: public HttpRequestHandler
, public std::enable_shared_from_this<FaabricEndpointHandler>
{
public:
HTTP_PROTOTYPE(FaabricEndpointHandler)

void onTimeout(const Pistache::Http::Request& request,
Pistache::Http::ResponseWriter writer) override;

void onRequest(const Pistache::Http::Request& request,
Pistache::Http::ResponseWriter response) override;

std::pair<int, std::string> handleFunction(const std::string& requestStr);
void onRequest(HttpRequestContext&& ctx,
faabric::util::BeastHttpRequest&& request) override;

private:
std::pair<int, std::string> executeFunction(faabric::Message& msg);
void executeFunction(HttpRequestContext&& ctx,
faabric::util::BeastHttpResponse&& partialResponse,
faabric::Message&& msg);

void onFunctionResult(HttpRequestContext&& ctx,
faabric::util::BeastHttpResponse&& partialResponse,
faabric::Message& msg);
};
}
3 changes: 2 additions & 1 deletion include/faabric/mpi-native/MpiExecutor.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <faabric/endpoint/FaabricEndpoint.h>
#include <faabric/endpoint/Endpoint.h>
#include <faabric/endpoint/FaabricEndpointHandler.h>
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/scheduler/Scheduler.h>

Expand Down
1 change: 1 addition & 0 deletions include/faabric/scheduler/FunctionCallApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ enum FunctionCalls
Unregister = 3,
GetResources = 4,
SetThreadResult = 5,
DirectResult = 6,
};
}
2 changes: 2 additions & 0 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient
void executeFunctions(
const std::shared_ptr<faabric::BatchExecuteRequest> req);

void sendDirectResult(faabric::Message msg);

void unregister(faabric::UnregisterRequest& req);

private:
Expand Down
2 changes: 2 additions & 0 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ class FunctionCallServer final
void recvExecuteFunctions(const uint8_t* buffer, size_t bufferSize);

void recvUnregister(const uint8_t* buffer, size_t bufferSize);

void recvDirectResult(const uint8_t* buffer, size_t bufferSize);
};
}
57 changes: 52 additions & 5 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/util/asio.h>
#include <faabric/util/config.h>
#include <faabric/util/func.h>
#include <faabric/util/queue.h>
#include <faabric/util/snapshot.h>
#include <faabric/util/timing.h>

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <future>
#include <optional>
#include <shared_mutex>

#define AVAILABLE_HOST_SET "available_hosts"
Expand Down Expand Up @@ -54,6 +60,8 @@ class Executor

void finish();

virtual void setup(faabric::Message& msg);

virtual void reset(faabric::Message& msg);

virtual int32_t executeTask(
Expand All @@ -72,6 +80,8 @@ class Executor
protected:
virtual void restore(faabric::Message& msg);

virtual void softShutdown();

virtual void postFinish();

faabric::Message boundMessage;
Expand All @@ -87,11 +97,37 @@ class Executor
std::vector<std::shared_ptr<std::thread>> threadPoolThreads;
std::vector<std::shared_ptr<std::thread>> deadThreads;

std::mutex setupMutex;
std::atomic_bool setupDone;

std::vector<faabric::util::Queue<ExecutorTask>> threadTaskQueues;

void threadPoolThread(int threadPoolIdx);
};

struct MessageLocalResult final
{
std::promise<std::unique_ptr<faabric::Message>> promise;
int event_fd = -1;

MessageLocalResult();
MessageLocalResult(const MessageLocalResult&) = delete;
inline MessageLocalResult(MessageLocalResult&& other)
{
this->operator=(std::move(other));
}
MessageLocalResult& operator=(const MessageLocalResult&) = delete;
inline MessageLocalResult& operator=(MessageLocalResult&& other)
{
this->promise = std::move(other.promise);
this->event_fd = other.event_fd;
other.event_fd = -1;
return *this;
}
~MessageLocalResult();
void set_value(std::unique_ptr<faabric::Message>&& msg);
};

class Scheduler
{
public:
Expand Down Expand Up @@ -127,6 +163,12 @@ class Scheduler

faabric::Message getFunctionResult(unsigned int messageId, int timeout);

void getFunctionResultAsync(unsigned int messageId,
int timeoutMs,
asio::io_context& ioc,
asio::any_io_executor& executor,
std::function<void(faabric::Message&)> handler);

void setThreadResult(const faabric::Message& msg, int32_t returnValue);

void pushSnapshotDiffs(
Expand Down Expand Up @@ -182,7 +224,15 @@ class Scheduler

ExecGraph getFunctionExecGraph(unsigned int msgId);

void updateMonitoring();

std::atomic_int32_t monitorLocallyScheduledTasks;
std::atomic_int32_t monitorStartedTasks;
std::atomic_int32_t monitorWaitingTasks;

private:
int monitorFd = -1;

std::string thisHost;

faabric::util::SystemConfig& conf;
Expand All @@ -207,8 +257,7 @@ class Scheduler
std::set<std::string> availableHostsCache;
std::unordered_map<std::string, std::set<std::string>> registeredHosts;

std::unordered_map<uint32_t,
std::promise<std::unique_ptr<faabric::Message>>>
std::unordered_map<uint32_t, std::shared_ptr<MessageLocalResult>>
localResults;
std::mutex localResultsMutex;

Expand All @@ -220,9 +269,7 @@ class Scheduler
std::vector<std::string> getUnregisteredHosts(const std::string& funcStr,
bool noCache = false);

std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
faabric::util::FullLock& schedulerLock);
std::shared_ptr<Executor> claimExecutor(faabric::Message& msg);

faabric::HostResources getHostResources(const std::string& host);

Expand Down
2 changes: 1 addition & 1 deletion include/faabric/snapshot/SnapshotRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class SnapshotRegistry

bool snapshotExists(const std::string& key);

void mapSnapshot(const std::string& key, uint8_t* target);
uint8_t* mapSnapshot(const std::string& key, uint8_t* target);

void takeSnapshot(const std::string& key,
faabric::util::SnapshotData data,
Expand Down
Loading

0 comments on commit 3518406

Please sign in to comment.