From f1fdde4601f83edcdbd3430502fb458b039e7c85 Mon Sep 17 00:00:00 2001 From: Eduardo Bart Date: Sat, 11 Nov 2023 12:14:49 -0300 Subject: [PATCH] refactor: make possible to compile libcartesi without threading support --- src/Makefile | 26 ++++++---- src/machine-c-api.cpp | 4 +- src/machine-c-api.h | 1 - src/machine.cpp | 107 ++++++++++++++++++------------------------ src/os.cpp | 46 ++++++++++++++++++ src/os.h | 33 +++++++++++++ 6 files changed, 143 insertions(+), 74 deletions(-) diff --git a/src/Makefile b/src/Makefile index 5f64c567c..e3a287ece 100644 --- a/src/Makefile +++ b/src/Makefile @@ -37,6 +37,7 @@ GRPC_CPP_PLUGIN=$(shell which grpc_cpp_plugin) release?=no sanitize?=no coverage?=no +nothreads?=no COVERAGE_TOOLCHAIN?=gcc COVERAGE_OUTPUT_DIR?=coverage @@ -53,13 +54,13 @@ ifeq ($(TARGET_OS),Darwin) SOLDFLAGS=-bundle -undefined dynamic_lookup LIBLDFLAGS=-dynamiclib -undefined dynamic_lookup EXELDFLAGS= +PTHREAD_CFLAGS= +PTHREAD_LDFLAGS=-lpthread CC=clang CXX=clang++ AR_EXEC=libtool -static INCS= -PTHREAD_LIB=-lpthread - ifeq ($(MACOSX_DEPLOYMENT_TARGET),) export MACOSX_DEPLOYMENT_TARGET := $(shell sw_vers -productVersion | sed -r "s/([[:digit:]]+)\.([[:digit:]]+)\..+/\1.\2.0/") endif @@ -95,15 +96,16 @@ PROFILE_DATA=default.profdata else # Linux specific setup -SOLDFLAGS=-shared -fPIC -pthread $(GCLDFLAGS) -LIBLDFLAGS=$(SOLDFLAGS) -Wl,-z,defs +SOLDFLAGS=-shared -fPIC $(GCLDFLAGS) +LIBLDFLAGS=$(SOLDFLAGS) EXELDFLAGS=$(GCLDFLAGS) +PTHREAD_CFLAGS=-pthread +PTHREAD_LDFLAGS=-pthread -lpthread CC=gcc CXX=g++ AR_EXEC=ar rcs INCS= -PTHREAD_LIB=-lpthread BOOST_INC= GRPC_PROTOBUF_INC=$(shell pkg-config --cflags-only-I grpc++ protobuf) GRPC_PROTOBUF_LIB=$(shell pkg-config --libs grpc++ protobuf) @@ -125,7 +127,7 @@ LUACARTESI_GRPC_LIBS=$(GRPC_PROTOBUF_LIB) LUACARTESI_JSONRPC_LIBS= REMOTE_CARTESI_MACHINE_LIBS=$(GRPC_PROTOBUF_LIB) JSONRPC_REMOTE_CARTESI_MACHINE_LIBS= -TEST_MACHINE_C_API_LIBS=$(GRPC_PROTOBUF_LIB) $(PTHREAD_LIB) +TEST_MACHINE_C_API_LIBS=$(GRPC_PROTOBUF_LIB) HASH_LIBS= #DEFS+= -DMT_ALL_DIRTY @@ -249,6 +251,14 @@ EMPTY:= SPACE:=$(EMPTY) $(EMPTY) CLANG_TIDY_HEADER_FILTER=$(PWD)/($(subst $(SPACE),|,$(LINTER_HEADERS))) +ifeq ($(nothreads),no) +CFLAGS+=$(PTHREAD_CFLAGS) +CXXFLAGS+=$(PTHREAD_CFLAGS) +LDFLAGS+=$(PTHREAD_LDFLAGS) +else +DEFS+=-DNO_THREADS +endif + CXXFLAGS+=$(OPTFLAGS) -std=gnu++17 -fvisibility=hidden -fPIC -MMD $(CC_MARCH) $(INCS) $(GCFLAGS) $(UBFLAGS) $(DEFS) $(WARNS) CFLAGS+=$(OPTFLAGS) -std=gnu99 -fvisibility=hidden -fPIC -MMD $(CC_MARCH) $(INCS) $(GCFLAGS) $(UBFLAGS) $(DEFS) $(WARNS) LDFLAGS+=$(UBFLAGS) @@ -276,8 +286,8 @@ $(error invalid value for COVERAGE_TOOLCHAIN: $(COVERAGE_TOOLCHAIN)) endif endif -CXXFLAGS+=$(MYCXXFLAGS) -CFLAGS+=$(MYCFLAGS) +CXXFLAGS+=$(MYCXXFLAGS) $(MYDEFS) +CFLAGS+=$(MYCFLAGS) $(MYDEFS) LDFLAGS+=$(MYLDFLAGS) SOLDFLAGS+=$(MYSOLDFLAGS) LIBLDFLAGS+=$(MYLIBLDFLAGS) diff --git a/src/machine-c-api.cpp b/src/machine-c-api.cpp index 8724ca54a..fc80358d2 100644 --- a/src/machine-c-api.cpp +++ b/src/machine-c-api.cpp @@ -17,7 +17,7 @@ #include #include #include -#include +#include #include #include #include @@ -66,8 +66,6 @@ int cm_result_failure(char **err_msg) try { throw; } catch (std::exception &e) { return CM_ERROR_LENGTH_ERROR; } catch (std::out_of_range &ex) { return CM_ERROR_OUT_OF_RANGE; - } catch (std::future_error &ex) { - return CM_ERROR_FUTURE_ERROR; } catch (std::logic_error &ex) { return CM_ERROR_LOGIC_ERROR; } catch (std::bad_optional_access &ex) { diff --git a/src/machine-c-api.h b/src/machine-c-api.h index c290b60d9..4cae7c3cf 100644 --- a/src/machine-c-api.h +++ b/src/machine-c-api.h @@ -55,7 +55,6 @@ typedef enum { // NOLINT(modernize-use-using) CM_ERROR_DOMAIN_ERROR, CM_ERROR_LENGTH_ERROR, CM_ERROR_OUT_OF_RANGE, - CM_ERROR_FUTURE_ERROR, CM_ERROR_LOGIC_ERROR, CM_LOGIC_ERROR_END, // Bad optional access error diff --git a/src/machine.cpp b/src/machine.cpp index 05cd1e2a5..6de5ee927 100644 --- a/src/machine.cpp +++ b/src/machine.cpp @@ -15,15 +15,11 @@ // #include -#include #include #include #include -#include #include #include -#include -#include #include "clint-factory.h" #include "dtb.h" @@ -1448,7 +1444,7 @@ bool machine::verify_dirty_page_maps(void) const { } static uint64_t get_task_concurrency(uint64_t value) { - const uint64_t concurrency = value > 0 ? value : std::max(std::thread::hardware_concurrency(), 1U); + const uint64_t concurrency = value > 0 ? value : std::max(os_get_concurrency(), UINT64_C(1)); return std::min(concurrency, static_cast(THREADS_MAX)); } @@ -1467,68 +1463,55 @@ bool machine::update_merkle_tree(void) const { // For each PMA, we launch as many threads (n) as defined on concurrency // runtime config or as the hardware supports. const uint64_t n = get_task_concurrency(m_r.concurrency.update_merkle_tree); - // The update_page_node_hash function in the machine_merkle_tree is not thread - // safe, so we protect it with a mutex - std::mutex updatex; - // Each thread is launched as a future, whose value tells if the - // computation succeeded - std::vector> futures; - futures.reserve(n); - for (uint64_t j = 0; j < n; ++j) { - futures.emplace_back(std::async((n == 1) ? std::launch::deferred : std::launch::async, - [&](int j) -> bool { - auto scratch = unique_calloc(PMA_PAGE_SIZE, std::nothrow_t{}); - if (!scratch) { - return false; - } - machine_merkle_tree::hasher_type h; - // Thread j is responsible for page i if i % n == j. - for (uint64_t i = j; i < pages_in_range; i += n) { - const uint64_t page_start_in_range = i * PMA_PAGE_SIZE; - const uint64_t page_address = pma->get_start() + page_start_in_range; - const unsigned char *page_data = nullptr; - // Skip any clean pages - if (!pma->is_page_marked_dirty(page_start_in_range)) { - continue; - } - // If the peek failed, or if it returned a page for update but - // we failed updating it, the entire process failed - if (!peek(*pma, *this, page_start_in_range, &page_data, scratch.get())) { + const bool succeeded = os_parallel_for(n, [&](int j, const parallel_for_mutex &mutex) -> bool { + auto scratch = unique_calloc(PMA_PAGE_SIZE, std::nothrow_t{}); + if (!scratch) { + return false; + } + machine_merkle_tree::hasher_type h; + // Thread j is responsible for page i if i % n == j. + for (uint64_t i = j; i < pages_in_range; i += n) { + const uint64_t page_start_in_range = i * PMA_PAGE_SIZE; + const uint64_t page_address = pma->get_start() + page_start_in_range; + const unsigned char *page_data = nullptr; + // Skip any clean pages + if (!pma->is_page_marked_dirty(page_start_in_range)) { + continue; + } + // If the peek failed, or if it returned a page for update but + // we failed updating it, the entire process failed + if (!peek(*pma, *this, page_start_in_range, &page_data, scratch.get())) { + return false; + } + if (page_data) { + const bool is_pristine = std::all_of(page_data, page_data + PMA_PAGE_SIZE, + [](unsigned char pp) -> bool { return pp == '\0'; }); + + if (is_pristine) { + // The update_page_node_hash function in the machine_merkle_tree is not thread + // safe, so we protect it with a mutex + const parallel_for_mutex_guard lock(mutex); + if (!m_t.update_page_node_hash(page_address, + machine_merkle_tree::get_pristine_hash(machine_merkle_tree::get_log2_page_size()))) { return false; } - if (page_data) { - const bool is_pristine = std::all_of(page_data, page_data + PMA_PAGE_SIZE, - [](unsigned char pp) -> bool { return pp == '\0'; }); - - if (is_pristine) { - const std::lock_guard lock(updatex); - if (!m_t.update_page_node_hash(page_address, - machine_merkle_tree::get_pristine_hash( - machine_merkle_tree::get_log2_page_size()))) { - return false; - } - } else { - hash_type hash; - m_t.get_page_node_hash(h, page_data, hash); - { - const std::lock_guard lock(updatex); - if (!m_t.update_page_node_hash(page_address, hash)) { - return false; - } - } + } else { + hash_type hash; + m_t.get_page_node_hash(h, page_data, hash); + { + // The update_page_node_hash function in the machine_merkle_tree is not thread + // safe, so we protect it with a mutex + const parallel_for_mutex_guard lock(mutex); + if (!m_t.update_page_node_hash(page_address, hash)) { + return false; } } } - return true; - }, - j)); - } - // Check if any thread failed - bool succeeded = true; - for (auto &f : futures) { - succeeded = succeeded && f.get(); - } - // If so, we also failed + } + } + return true; + }); + // If any thread failed, we also failed if (!succeeded) { m_t.end_update(gh); return false; diff --git a/src/os.cpp b/src/os.cpp index d302c9b27..d970f5dd9 100644 --- a/src/os.cpp +++ b/src/os.cpp @@ -34,6 +34,10 @@ #define HAVE_MKDIR #endif +#if !defined(NO_THREADS) +#define HAVE_THREADS +#endif + #include #include #include @@ -45,6 +49,12 @@ #include "os.h" #include "unique-c-ptr.h" +#ifdef HAVE_THREADS +#include +#include +#include +#endif + #if defined(HAVE_TTY) || defined(HAVE_MMAP) || defined(HAVE_TERMIOS) || defined(_WIN32) #include // open #endif @@ -514,4 +524,40 @@ int64_t os_now_us() { return static_cast(std::chrono::duration_cast(end - start).count()); } +uint64_t os_get_concurrency() { +#ifdef HAVE_THREADS + return std::thread::hardware_concurrency(); +#else + return 1; +#endif +} + +bool os_parallel_for(uint64_t n, const std::function &task) { +#ifdef HAVE_THREADS + if (n > 1) { + std::mutex mutex; + const parallel_for_mutex for_mutex = {[&] { mutex.lock(); }, [&] { mutex.unlock(); }}; + std::vector> futures; + futures.reserve(n); + for (uint64_t j = 0; j < n; ++j) { + futures.emplace_back(std::async(std::launch::async, task, j, for_mutex)); + } + // Check if any thread failed + bool succeeded = true; + for (auto &f : futures) { + succeeded = succeeded && f.get(); + } + // Return overall status + return succeeded; + } +#endif + // Run without extra threads when concurrency is 1 or as fallback + const parallel_for_mutex for_mutex{[] {}, [] {}}; + bool succeeded = true; + for (uint64_t j = 0; j < n; ++j) { + succeeded = succeeded && task(j, for_mutex); + } + return succeeded; +} + } // namespace cartesi diff --git a/src/os.h b/src/os.h index a6d67aeb0..7440ef85e 100644 --- a/src/os.h +++ b/src/os.h @@ -19,6 +19,7 @@ #include #include +#include /// \file /// \brief System-specific OS handling operations @@ -55,6 +56,38 @@ void os_unmap_file(unsigned char *host_memory, uint64_t length); /// \brief Get time elapsed since its first call with microsecond precision int64_t os_now_us(); +/// \brief Get the number of concurrent threads supported by the OS +uint64_t os_get_concurrency(); + +/// \brief Mutex for os_parallel_for() +struct parallel_for_mutex { + std::function lock; + std::function unlock; +}; + +/// \brief Mutex guard for os_parallel_for() +struct parallel_for_mutex_guard { + parallel_for_mutex_guard(const parallel_for_mutex &mutex) : mutex(mutex) { + mutex.lock(); + } + ~parallel_for_mutex_guard() { + mutex.unlock(); + } + + parallel_for_mutex_guard() = delete; + parallel_for_mutex_guard(const parallel_for_mutex_guard &) = default; + parallel_for_mutex_guard(parallel_for_mutex_guard &&) = default; + parallel_for_mutex_guard &operator=(const parallel_for_mutex_guard &) = delete; + parallel_for_mutex_guard &operator=(parallel_for_mutex_guard &&) = delete; + +private: + parallel_for_mutex mutex; +}; + +/// \brief Runs a for loop in parallel using up to n threads +/// \return True if all thread tasks succeeded +bool os_parallel_for(uint64_t n, const std::function &task); + } // namespace cartesi #endif