Skip to content

Commit

Permalink
refactor: make possible to compile libcartesi without threading support
Browse files Browse the repository at this point in the history
  • Loading branch information
edubart committed Nov 11, 2023
1 parent 98f2234 commit f1fdde4
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 74 deletions.
26 changes: 18 additions & 8 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ GRPC_CPP_PLUGIN=$(shell which grpc_cpp_plugin)
release?=no
sanitize?=no
coverage?=no
nothreads?=no

COVERAGE_TOOLCHAIN?=gcc
COVERAGE_OUTPUT_DIR?=coverage
Expand All @@ -53,13 +54,13 @@ ifeq ($(TARGET_OS),Darwin)
SOLDFLAGS=-bundle -undefined dynamic_lookup
LIBLDFLAGS=-dynamiclib -undefined dynamic_lookup
EXELDFLAGS=
PTHREAD_CFLAGS=
PTHREAD_LDFLAGS=-lpthread
CC=clang
CXX=clang++
AR_EXEC=libtool -static
INCS=

PTHREAD_LIB=-lpthread

ifeq ($(MACOSX_DEPLOYMENT_TARGET),)
export MACOSX_DEPLOYMENT_TARGET := $(shell sw_vers -productVersion | sed -r "s/([[:digit:]]+)\.([[:digit:]]+)\..+/\1.\2.0/")
endif
Expand Down Expand Up @@ -95,15 +96,16 @@ PROFILE_DATA=default.profdata
else

# Linux specific setup
SOLDFLAGS=-shared -fPIC -pthread $(GCLDFLAGS)
LIBLDFLAGS=$(SOLDFLAGS) -Wl,-z,defs
SOLDFLAGS=-shared -fPIC $(GCLDFLAGS)
LIBLDFLAGS=$(SOLDFLAGS)
EXELDFLAGS=$(GCLDFLAGS)
PTHREAD_CFLAGS=-pthread
PTHREAD_LDFLAGS=-pthread -lpthread
CC=gcc
CXX=g++
AR_EXEC=ar rcs
INCS=

PTHREAD_LIB=-lpthread
BOOST_INC=
GRPC_PROTOBUF_INC=$(shell pkg-config --cflags-only-I grpc++ protobuf)
GRPC_PROTOBUF_LIB=$(shell pkg-config --libs grpc++ protobuf)
Expand All @@ -125,7 +127,7 @@ LUACARTESI_GRPC_LIBS=$(GRPC_PROTOBUF_LIB)
LUACARTESI_JSONRPC_LIBS=
REMOTE_CARTESI_MACHINE_LIBS=$(GRPC_PROTOBUF_LIB)
JSONRPC_REMOTE_CARTESI_MACHINE_LIBS=
TEST_MACHINE_C_API_LIBS=$(GRPC_PROTOBUF_LIB) $(PTHREAD_LIB)
TEST_MACHINE_C_API_LIBS=$(GRPC_PROTOBUF_LIB)
HASH_LIBS=

#DEFS+= -DMT_ALL_DIRTY
Expand Down Expand Up @@ -249,6 +251,14 @@ EMPTY:=
SPACE:=$(EMPTY) $(EMPTY)
CLANG_TIDY_HEADER_FILTER=$(PWD)/($(subst $(SPACE),|,$(LINTER_HEADERS)))

ifeq ($(nothreads),no)
CFLAGS+=$(PTHREAD_CFLAGS)
CXXFLAGS+=$(PTHREAD_CFLAGS)
LDFLAGS+=$(PTHREAD_LDFLAGS)
else
DEFS+=-DNO_THREADS
endif

CXXFLAGS+=$(OPTFLAGS) -std=gnu++17 -fvisibility=hidden -fPIC -MMD $(CC_MARCH) $(INCS) $(GCFLAGS) $(UBFLAGS) $(DEFS) $(WARNS)
CFLAGS+=$(OPTFLAGS) -std=gnu99 -fvisibility=hidden -fPIC -MMD $(CC_MARCH) $(INCS) $(GCFLAGS) $(UBFLAGS) $(DEFS) $(WARNS)
LDFLAGS+=$(UBFLAGS)
Expand Down Expand Up @@ -276,8 +286,8 @@ $(error invalid value for COVERAGE_TOOLCHAIN: $(COVERAGE_TOOLCHAIN))
endif
endif

CXXFLAGS+=$(MYCXXFLAGS)
CFLAGS+=$(MYCFLAGS)
CXXFLAGS+=$(MYCXXFLAGS) $(MYDEFS)
CFLAGS+=$(MYCFLAGS) $(MYDEFS)
LDFLAGS+=$(MYLDFLAGS)
SOLDFLAGS+=$(MYSOLDFLAGS)
LIBLDFLAGS+=$(MYLIBLDFLAGS)
Expand Down
4 changes: 1 addition & 3 deletions src/machine-c-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <any>
#include <cstring>
#include <exception>
#include <future>
#include <functional>
#include <ios>
#include <optional>
#include <regex>
Expand Down Expand Up @@ -66,8 +66,6 @@ int cm_result_failure(char **err_msg) try { throw; } catch (std::exception &e) {
return CM_ERROR_LENGTH_ERROR;
} catch (std::out_of_range &ex) {
return CM_ERROR_OUT_OF_RANGE;
} catch (std::future_error &ex) {
return CM_ERROR_FUTURE_ERROR;
} catch (std::logic_error &ex) {
return CM_ERROR_LOGIC_ERROR;
} catch (std::bad_optional_access &ex) {
Expand Down
1 change: 0 additions & 1 deletion src/machine-c-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ typedef enum { // NOLINT(modernize-use-using)
CM_ERROR_DOMAIN_ERROR,
CM_ERROR_LENGTH_ERROR,
CM_ERROR_OUT_OF_RANGE,
CM_ERROR_FUTURE_ERROR,
CM_ERROR_LOGIC_ERROR,
CM_LOGIC_ERROR_END,
// Bad optional access error
Expand Down
107 changes: 45 additions & 62 deletions src/machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@
//

#include <boost/range/adaptor/sliced.hpp>
#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <cstring>
#include <future>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <thread>

#include "clint-factory.h"
#include "dtb.h"
Expand Down Expand Up @@ -1448,7 +1444,7 @@ bool machine::verify_dirty_page_maps(void) const {
}

static uint64_t get_task_concurrency(uint64_t value) {
const uint64_t concurrency = value > 0 ? value : std::max(std::thread::hardware_concurrency(), 1U);
const uint64_t concurrency = value > 0 ? value : std::max(os_get_concurrency(), UINT64_C(1));
return std::min(concurrency, static_cast<uint64_t>(THREADS_MAX));
}

Expand All @@ -1467,68 +1463,55 @@ bool machine::update_merkle_tree(void) const {
// For each PMA, we launch as many threads (n) as defined on concurrency
// runtime config or as the hardware supports.
const uint64_t n = get_task_concurrency(m_r.concurrency.update_merkle_tree);
// The update_page_node_hash function in the machine_merkle_tree is not thread
// safe, so we protect it with a mutex
std::mutex updatex;
// Each thread is launched as a future, whose value tells if the
// computation succeeded
std::vector<std::future<bool>> futures;
futures.reserve(n);
for (uint64_t j = 0; j < n; ++j) {
futures.emplace_back(std::async((n == 1) ? std::launch::deferred : std::launch::async,
[&](int j) -> bool {
auto scratch = unique_calloc<unsigned char>(PMA_PAGE_SIZE, std::nothrow_t{});
if (!scratch) {
return false;
}
machine_merkle_tree::hasher_type h;
// Thread j is responsible for page i if i % n == j.
for (uint64_t i = j; i < pages_in_range; i += n) {
const uint64_t page_start_in_range = i * PMA_PAGE_SIZE;
const uint64_t page_address = pma->get_start() + page_start_in_range;
const unsigned char *page_data = nullptr;
// Skip any clean pages
if (!pma->is_page_marked_dirty(page_start_in_range)) {
continue;
}
// If the peek failed, or if it returned a page for update but
// we failed updating it, the entire process failed
if (!peek(*pma, *this, page_start_in_range, &page_data, scratch.get())) {
const bool succeeded = os_parallel_for(n, [&](int j, const parallel_for_mutex &mutex) -> bool {
auto scratch = unique_calloc<unsigned char>(PMA_PAGE_SIZE, std::nothrow_t{});
if (!scratch) {
return false;
}
machine_merkle_tree::hasher_type h;
// Thread j is responsible for page i if i % n == j.
for (uint64_t i = j; i < pages_in_range; i += n) {
const uint64_t page_start_in_range = i * PMA_PAGE_SIZE;
const uint64_t page_address = pma->get_start() + page_start_in_range;
const unsigned char *page_data = nullptr;
// Skip any clean pages
if (!pma->is_page_marked_dirty(page_start_in_range)) {
continue;
}
// If the peek failed, or if it returned a page for update but
// we failed updating it, the entire process failed
if (!peek(*pma, *this, page_start_in_range, &page_data, scratch.get())) {
return false;
}
if (page_data) {
const bool is_pristine = std::all_of(page_data, page_data + PMA_PAGE_SIZE,
[](unsigned char pp) -> bool { return pp == '\0'; });

if (is_pristine) {
// The update_page_node_hash function in the machine_merkle_tree is not thread
// safe, so we protect it with a mutex
const parallel_for_mutex_guard lock(mutex);
if (!m_t.update_page_node_hash(page_address,
machine_merkle_tree::get_pristine_hash(machine_merkle_tree::get_log2_page_size()))) {
return false;
}
if (page_data) {
const bool is_pristine = std::all_of(page_data, page_data + PMA_PAGE_SIZE,
[](unsigned char pp) -> bool { return pp == '\0'; });

if (is_pristine) {
const std::lock_guard<std::mutex> lock(updatex);
if (!m_t.update_page_node_hash(page_address,
machine_merkle_tree::get_pristine_hash(
machine_merkle_tree::get_log2_page_size()))) {
return false;
}
} else {
hash_type hash;
m_t.get_page_node_hash(h, page_data, hash);
{
const std::lock_guard<std::mutex> lock(updatex);
if (!m_t.update_page_node_hash(page_address, hash)) {
return false;
}
}
} else {
hash_type hash;
m_t.get_page_node_hash(h, page_data, hash);
{
// The update_page_node_hash function in the machine_merkle_tree is not thread
// safe, so we protect it with a mutex
const parallel_for_mutex_guard lock(mutex);
if (!m_t.update_page_node_hash(page_address, hash)) {
return false;
}
}
}
return true;
},
j));
}
// Check if any thread failed
bool succeeded = true;
for (auto &f : futures) {
succeeded = succeeded && f.get();
}
// If so, we also failed
}
}
return true;
});
// If any thread failed, we also failed
if (!succeeded) {
m_t.end_update(gh);
return false;
Expand Down
46 changes: 46 additions & 0 deletions src/os.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#define HAVE_MKDIR
#endif

#if !defined(NO_THREADS)
#define HAVE_THREADS
#endif

#include <array>
#include <chrono>
#include <cstdint>
Expand All @@ -45,6 +49,12 @@
#include "os.h"
#include "unique-c-ptr.h"

#ifdef HAVE_THREADS
#include <future>
#include <mutex>
#include <thread>
#endif

#if defined(HAVE_TTY) || defined(HAVE_MMAP) || defined(HAVE_TERMIOS) || defined(_WIN32)
#include <fcntl.h> // open
#endif
Expand Down Expand Up @@ -514,4 +524,40 @@ int64_t os_now_us() {
return static_cast<int64_t>(std::chrono::duration_cast<std::chrono::microseconds>(end - start).count());
}

uint64_t os_get_concurrency() {
#ifdef HAVE_THREADS
return std::thread::hardware_concurrency();
#else
return 1;
#endif
}

bool os_parallel_for(uint64_t n, const std::function<bool(uint64_t j, const parallel_for_mutex &mutex)> &task) {
#ifdef HAVE_THREADS
if (n > 1) {
std::mutex mutex;
const parallel_for_mutex for_mutex = {[&] { mutex.lock(); }, [&] { mutex.unlock(); }};
std::vector<std::future<bool>> futures;
futures.reserve(n);
for (uint64_t j = 0; j < n; ++j) {
futures.emplace_back(std::async(std::launch::async, task, j, for_mutex));
}
// Check if any thread failed
bool succeeded = true;
for (auto &f : futures) {
succeeded = succeeded && f.get();
}
// Return overall status
return succeeded;
}
#endif
// Run without extra threads when concurrency is 1 or as fallback
const parallel_for_mutex for_mutex{[] {}, [] {}};
bool succeeded = true;
for (uint64_t j = 0; j < n; ++j) {
succeeded = succeeded && task(j, for_mutex);
}
return succeeded;
}

} // namespace cartesi
33 changes: 33 additions & 0 deletions src/os.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <cstddef>
#include <cstdint>
#include <functional>

/// \file
/// \brief System-specific OS handling operations
Expand Down Expand Up @@ -55,6 +56,38 @@ void os_unmap_file(unsigned char *host_memory, uint64_t length);
/// \brief Get time elapsed since its first call with microsecond precision
int64_t os_now_us();

/// \brief Get the number of concurrent threads supported by the OS
uint64_t os_get_concurrency();

/// \brief Mutex for os_parallel_for()
struct parallel_for_mutex {
std::function<void()> lock;
std::function<void()> unlock;
};

/// \brief Mutex guard for os_parallel_for()
struct parallel_for_mutex_guard {
parallel_for_mutex_guard(const parallel_for_mutex &mutex) : mutex(mutex) {
mutex.lock();
}
~parallel_for_mutex_guard() {
mutex.unlock();
}

parallel_for_mutex_guard() = delete;
parallel_for_mutex_guard(const parallel_for_mutex_guard &) = default;
parallel_for_mutex_guard(parallel_for_mutex_guard &&) = default;
parallel_for_mutex_guard &operator=(const parallel_for_mutex_guard &) = delete;
parallel_for_mutex_guard &operator=(parallel_for_mutex_guard &&) = delete;

private:
parallel_for_mutex mutex;
};

/// \brief Runs a for loop in parallel using up to n threads
/// \return True if all thread tasks succeeded
bool os_parallel_for(uint64_t n, const std::function<bool(uint64_t j, const parallel_for_mutex &mutex)> &task);

} // namespace cartesi

#endif

0 comments on commit f1fdde4

Please sign in to comment.