Skip to content

Commit

Permalink
pvf unix socket (#2327)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <[email protected]>
  • Loading branch information
turuslan authored Dec 26, 2024
1 parent 78779b9 commit bc110f7
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 80 deletions.
72 changes: 36 additions & 36 deletions core/parachain/pvf/kagome_pvf_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

#include <filesystem>
#include <iostream>
#include <memory>
#include <ranges>
#include <span>
Expand All @@ -23,6 +22,7 @@

#include <fmt/format.h>
#include <boost/asio.hpp>
#include <boost/asio/local/stream_protocol.hpp>
#include <boost/process.hpp>
#include <libp2p/basic/scheduler/asio_scheduler_backend.hpp>
#include <libp2p/basic/scheduler/scheduler_impl.hpp>
Expand Down Expand Up @@ -62,6 +62,8 @@
}

namespace kagome::parachain {
using unix = boost::asio::local::stream_protocol;

namespace {
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
static kagome::log::Logger logger;
Expand Down Expand Up @@ -229,26 +231,21 @@ namespace kagome::parachain {
}
#endif

outcome::result<void> readStdin(std::span<uint8_t> out) {
std::cin.read(
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
reinterpret_cast<char *>(out.data()),
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions)
out.size());
if (not std::cin.good()) {
return std::errc::io_error;
}
return outcome::success();
}

template <typename T>
outcome::result<T> decodeInput() {
outcome::result<T> decodeInput(unix::socket &socket) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init,hicpp-member-init)
std::array<uint8_t, sizeof(uint32_t)> length_bytes;
OUTCOME_TRY(readStdin(length_bytes));
boost::system::error_code ec;
boost::asio::read(socket, boost::asio::buffer(length_bytes), ec);
if (ec) {
return ec;
}
OUTCOME_TRY(message_length, scale::decode<uint32_t>(length_bytes));
std::vector<uint8_t> packed_message(message_length, 0);
OUTCOME_TRY(readStdin(packed_message));
boost::asio::read(socket, boost::asio::buffer(packed_message), ec);
if (ec) {
return ec;
}
return scale::decode<T>(packed_message);
}

Expand Down Expand Up @@ -282,8 +279,16 @@ namespace kagome::parachain {
}
}

outcome::result<void> pvf_worker_main_outcome() {
OUTCOME_TRY(input_config, decodeInput<PvfWorkerInputConfig>());
outcome::result<void> pvf_worker_main_outcome(
const std::string &unix_socket_path) {
boost::asio::io_context io_context;
unix::socket socket{io_context};
boost::system::error_code ec;
socket.connect(unix_socket_path, ec);
if (ec) {
return ec;
}
OUTCOME_TRY(input_config, decodeInput<PvfWorkerInputConfig>(socket));
kagome::log::tuneLoggingSystem(input_config.log_params);

SL_VERBOSE(logger, "Cache directory: {}", input_config.cache_dir);
Expand Down Expand Up @@ -347,7 +352,7 @@ namespace kagome::parachain {
OUTCOME_TRY(factory, createModuleFactory(injector, input_config.engine));
std::shared_ptr<runtime::Module> module;
while (true) {
OUTCOME_TRY(input, decodeInput<PvfWorkerInput>());
OUTCOME_TRY(input, decodeInput<PvfWorkerInput>(socket));

if (auto *code_params = std::get_if<PvfWorkerInputCodeParams>(&input)) {
auto &path = code_params->path;
Expand All @@ -370,17 +375,14 @@ namespace kagome::parachain {
OUTCOME_TRY(instance->resetEnvironment());
OUTCOME_TRY(len, scale::encode<uint32_t>(result.size()));

std::cout.write(
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
reinterpret_cast<const char *>(len.data()),
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions)
len.size());
std::cout.write(
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
reinterpret_cast<const char *>(result.data()),
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions)
result.size());
std::cout.flush();
boost::asio::write(socket, boost::asio::buffer(len), ec);
if (ec) {
return ec;
}
boost::asio::write(socket, boost::asio::buffer(result), ec);
if (ec) {
return ec;
}
}
}

Expand All @@ -399,14 +401,12 @@ namespace kagome::parachain {
}
kagome::log::setLoggingSystem(logging_system);
logger = kagome::log::createLogger("PVF Worker", "parachain");

if (!checkEnvVarsEmpty(env)) {
logger->error(
"PVF worker processes must not have any environment variables.");
if (argc < 2) {
SL_ERROR(logger, "missing unix socket path arg");
return EXIT_FAILURE;
}

if (auto r = pvf_worker_main_outcome(); not r) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
if (auto r = pvf_worker_main_outcome(argv[1]); not r) {
SL_ERROR(logger, "PVF worker process failed: {}", r.error());
return EXIT_FAILURE;
}
Expand Down
4 changes: 4 additions & 0 deletions core/parachain/pvf/kagome_pvf_worker_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
#include "crypto/ed25519/ed25519_provider_impl.hpp"
#include "crypto/elliptic_curves/elliptic_curves_impl.hpp"
#include "crypto/hasher/hasher_impl.hpp"
#include "crypto/key_store.hpp"
#include "crypto/pbkdf2/impl/pbkdf2_provider_impl.hpp"
#include "crypto/secp256k1/secp256k1_provider_impl.hpp"
#include "crypto/sr25519/sr25519_provider_impl.hpp"
#include "host_api/impl/host_api_factory_impl.hpp"
#include "injector/bind_by_lambda.hpp"
#include "offchain/offchain_persistent_storage.hpp"
#include "offchain/offchain_worker_pool.hpp"
#include "parachain/pvf/pvf_worker_types.hpp"
#include "runtime/binaryen/instance_environment_factory.hpp"
#include "runtime/binaryen/module/module_factory_impl.hpp"
#include "runtime/common/core_api_factory_impl.hpp"
#include "runtime/common/runtime_properties_cache_impl.hpp"
#include "runtime/memory_provider.hpp"
#include "runtime/module.hpp"
#include "runtime/runtime_instances_pool.hpp"
#include "runtime/wasm_compiler_definitions.hpp" // this header-file is generated
#include "storage/trie/serialization/trie_serializer_impl.hpp"
#include "storage/trie/trie_storage.hpp"
Expand Down
101 changes: 57 additions & 44 deletions core/parachain/pvf/workers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,27 @@

#include "parachain/pvf/workers.hpp"

#include <boost/asio/buffered_read_stream.hpp>
#include <boost/asio/buffered_write_stream.hpp>
#include <boost/asio/local/stream_protocol.hpp>
#include <boost/process.hpp>
#include <libp2p/basic/scheduler.hpp>
#include <libp2p/common/asio_buffer.hpp>
#include <qtils/option_take.hpp>

#include "application/app_configuration.hpp"
#include "common/main_thread_pool.hpp"
#include "filesystem/common.hpp"
#include "parachain/pvf/pvf_worker_types.hpp"
#include "utils/get_exe_path.hpp"
#include "utils/weak_macro.hpp"

namespace kagome::parachain {
constexpr auto kMetricQueueSize = "kagome_pvf_queue_size";
using unix = boost::asio::local::stream_protocol;

struct AsyncPipe : boost::process::async_pipe {
using async_pipe::async_pipe;
using lowest_layer_type = AsyncPipe;
};
constexpr auto kMetricQueueSize = "kagome_pvf_queue_size";

struct ProcessAndPipes : std::enable_shared_from_this<ProcessAndPipes> {
AsyncPipe pipe_stdin;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
AsyncPipe &writer;
AsyncPipe pipe_stdout;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
AsyncPipe &reader;
boost::process::child process;
std::optional<unix::socket> socket;
std::shared_ptr<Buffer> writing = std::make_shared<Buffer>();
std::shared_ptr<Buffer> reading = std::make_shared<Buffer>();

Expand All @@ -44,39 +36,34 @@ namespace kagome::parachain {

ProcessAndPipes(boost::asio::io_context &io_context,
const std::string &exe,
const std::string &unix_socket_path,
const Config &config)
: pipe_stdin{io_context},
writer{pipe_stdin},
pipe_stdout{io_context},
reader{pipe_stdout},
process{
exe,
boost::process::args({"pvf-worker"}),
boost::process::env(boost::process::environment()),
: process{
exe,
boost::process::args({"pvf-worker", unix_socket_path}),
boost::process::env(boost::process::environment()),
// LSAN doesn't work in secure mode
#ifdef KAGOME_WITH_ASAN
boost::process::env["ASAN_OPTIONS"] =
config.disable_lsan ? "detect_leaks=0" : "",
boost::process::env["ASAN_OPTIONS"] =
config.disable_lsan ? "detect_leaks=0" : "",
#endif
boost::process::std_out > pipe_stdout,
boost::process::std_in < pipe_stdin,
} {
} {
}

void write(Buffer data, auto cb) {
auto len = std::make_shared<common::Buffer>(
scale::encode<uint32_t>(data.size()).value());
*writing = std::move(data);
boost::asio::async_write(
writer,
*socket,
libp2p::asioBuffer(*len),
[WEAK_SELF, cb, len](boost::system::error_code ec, size_t) mutable {
WEAK_LOCK(self);
if (ec) {
return cb(ec);
}
boost::asio::async_write(
self->writer,
*self->socket,
libp2p::asioBuffer(*self->writing),
[weak_self, cb](boost::system::error_code ec, size_t) mutable {
WEAK_LOCK(self);
Expand All @@ -95,7 +82,7 @@ namespace kagome::parachain {
void read(auto cb) {
auto len = std::make_shared<common::Blob<sizeof(uint32_t)>>();
boost::asio::async_read(
reader,
*socket,
libp2p::asioBuffer(*len),
[WEAK_SELF, cb{std::move(cb)}, len](boost::system::error_code ec,
size_t) mutable {
Expand All @@ -109,7 +96,7 @@ namespace kagome::parachain {
}
self->reading->resize(len_res.value());
boost::asio::async_read(
self->reader,
*self->socket,
libp2p::asioBuffer(*self->reading),
[cb{std::move(cb)}, reading{self->reading}](
boost::system::error_code ec, size_t) mutable {
Expand Down Expand Up @@ -162,20 +149,46 @@ namespace kagome::parachain {
#if defined(__linux__) && defined(KAGOME_WITH_ASAN)
config.disable_lsan = !worker_config_.force_disable_secure_mode;
#endif
auto process =
std::make_shared<ProcessAndPipes>(*io_context_, exe_, config);
process->writeScale(
worker_config_,
[WEAK_SELF, job{std::move(job)}, used{std::move(used)}, process](
outcome::result<void> r) mutable {
WEAK_LOCK(self);
if (not r) {
return job.cb(r.error());
}
self->writeCode(std::move(job),
{.process = std::move(process)},
std::move(used));
});
auto unix_socket_path = filesystem::unique_path(
std::filesystem::path{worker_config_.cache_dir}
/ "unix_socket.%%%%%%");
std::error_code ec;
std::filesystem::remove(unix_socket_path, ec);
if (ec) {
return job.cb(ec);
}
auto acceptor = std::make_shared<unix::acceptor>(
*io_context_, unix_socket_path.native());
auto process = std::make_shared<ProcessAndPipes>(
*io_context_, exe_, unix_socket_path, config);
acceptor->async_accept([WEAK_SELF,
job{std::move(job)},
used,
unix_socket_path,
acceptor,
process{std::move(process)}](
boost::system::error_code ec,
unix::socket &&socket) mutable {
std::error_code ec2;
std::filesystem::remove(unix_socket_path, ec2);
WEAK_LOCK(self);
if (ec) {
return job.cb(ec);
}
process->socket = std::move(socket);
process->writeScale(
self->worker_config_,
[weak_self, job{std::move(job)}, used{std::move(used)}, process](
outcome::result<void> r) mutable {
WEAK_LOCK(self);
if (not r) {
return job.cb(r.error());
}
self->writeCode(std::move(job),
{.process = std::move(process)},
std::move(used));
});
});
return;
}
findFree(std::move(job));
Expand Down

0 comments on commit bc110f7

Please sign in to comment.