From 8f6f9dfaef90f64bfce0ecea81893a6561e09b01 Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Thu, 29 Aug 2024 09:34:22 +0200 Subject: [PATCH] fix(libsinsp): address reviewer thread pool suggestions Signed-off-by: Gianmatteo Palmieri Co-authored-by: Jason Dellaluce --- .github/workflows/ci.yml | 10 ++-- cmake/modules/CompilerFlags.cmake | 4 ++ cmake/modules/libsinsp.cmake | 12 +++-- userspace/libsinsp/CMakeLists.txt | 11 +++- userspace/libsinsp/plugin.cpp | 44 ++++++++-------- userspace/libsinsp/plugin.h | 13 +++-- userspace/libsinsp/sinsp.cpp | 23 ++++++++- userspace/libsinsp/sinsp.h | 9 ++-- userspace/libsinsp/test/plugins.ut.cpp | 20 ++++---- userspace/libsinsp/test/plugins/routines.cpp | 16 ++++-- userspace/libsinsp/test/thread_pool.ut.cpp | 26 +++++++--- userspace/libsinsp/thread_pool.h | 35 +------------ .../{thread_pool.cpp => thread_pool_bs.cpp} | 31 ++++++++---- userspace/libsinsp/thread_pool_bs.h | 50 +++++++++++++++++++ userspace/plugin/plugin_api.h | 14 ++++-- 15 files changed, 208 insertions(+), 110 deletions(-) rename userspace/libsinsp/{thread_pool.cpp => thread_pool_bs.cpp} (66%) create mode 100644 userspace/libsinsp/thread_pool_bs.h diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c2fce5cf5e..2957148a63 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,7 +64,7 @@ jobs: UBSAN_OPTIONS: print_stacktrace=1 run: | mkdir -p build - cd build && cmake ${{ matrix.cmake_opts }} ../ + cd build && cmake ${{ matrix.cmake_opts }} -DENABLE_THREAD_POOL=ON ../ KERNELDIR=/lib/modules/$(ls /lib/modules)/build make -j4 make run-unit-tests @@ -90,7 +90,7 @@ jobs: - name: Build and test ๐Ÿ—๏ธ๐Ÿงช run: | mkdir -p build - cd build && cmake -DBUILD_BPF=On -DBUILD_DRIVER=Off -DUSE_BUNDLED_DEPS=On -DUSE_BUNDLED_LIBELF=Off -DUSE_SHARED_LIBELF=Off -DBUILD_LIBSCAP_MODERN_BPF=ON -DMUSL_OPTIMIZED_BUILD=On ../ + cd build && cmake -DBUILD_BPF=On -DBUILD_DRIVER=Off -DUSE_BUNDLED_DEPS=On -DUSE_BUNDLED_LIBELF=Off -DUSE_SHARED_LIBELF=Off -DBUILD_LIBSCAP_MODERN_BPF=ON -DMUSL_OPTIMIZED_BUILD=On -DENABLE_THREAD_POOL=ON ../ make run-unit-tests -j4 build-shared-libs-linux-amd64: @@ -115,7 +115,7 @@ jobs: - name: Build and test ๐Ÿ—๏ธ๐Ÿงช run: | mkdir -p build - cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DMINIMAL_BUILD=True -DCMAKE_INSTALL_PREFIX=/tmp/libs-test ../ + cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DMINIMAL_BUILD=True -DCMAKE_INSTALL_PREFIX=/tmp/libs-test -DENABLE_THREAD_POOL=ON ../ make -j4 make run-unit-tests @@ -155,7 +155,7 @@ jobs: - name: Build and test ๐Ÿ—๏ธ๐Ÿงช run: | mkdir -p build - cd build && cmake -DUSE_BUNDLED_DEPS=ON -DCMAKE_BUILD_TYPE=Release -DCMAKE_MSVC_RUNTIME_LIBRARY=${{ matrix.crt }} -DCREATE_TEST_TARGETS=ON -DMINIMAL_BUILD=ON .. + cd build && cmake -DUSE_BUNDLED_DEPS=ON -DCMAKE_BUILD_TYPE=Release -DCMAKE_MSVC_RUNTIME_LIBRARY=${{ matrix.crt }} -DCREATE_TEST_TARGETS=ON -DMINIMAL_BUILD=ON -DENABLE_THREAD_POOL=ON .. cmake --build . --config Release --parallel 4 && make run-unit-tests || libsinsp\test\Release\unit-test-libsinsp.exe build-shared-libs-macos-amd64: @@ -174,7 +174,7 @@ jobs: - name: Build ๐Ÿ—๏ธ run: | mkdir -p build - cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DUSE_BUNDLED_VALIJSON=ON -DUSE_BUNDLED_BS_THREADPOOL=ON -DCMAKE_BUILD_TYPE=Release -DCREATE_TEST_TARGETS=OFF -DMINIMAL_BUILD=ON -DCMAKE_INSTALL_PREFIX=/tmp/libs-test .. + cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DUSE_BUNDLED_VALIJSON=ON -DUSE_BUNDLED_BS_THREADPOOL=ON -DENABLE_THREAD_POOL=ON -DCMAKE_BUILD_TYPE=Release -DCREATE_TEST_TARGETS=OFF -DMINIMAL_BUILD=ON -DCMAKE_INSTALL_PREFIX=/tmp/libs-test .. cmake --build . --config Release --parallel $(getconf _NPROCESSORS_ONLN) - name: Install diff --git a/cmake/modules/CompilerFlags.cmake b/cmake/modules/CompilerFlags.cmake index 9a1cdb9289..619874aaf7 100644 --- a/cmake/modules/CompilerFlags.cmake +++ b/cmake/modules/CompilerFlags.cmake @@ -77,6 +77,10 @@ if(NOT MSVC) set(FALCOSECURITY_LIBS_USERSPACE_LINK_FLAGS "${FALCOSECURITY_LIBS_USERSPACE_COMPILE_FLAGS};--coverage") endif() + if(ENABLE_THREAD_POOL) + set(FALCOSECURITY_LIBS_COMMON_FLAGS "${FALCOSECURITY_LIBS_COMMON_FLAGS} -DENABLE_THREAD_POOL") + endif() + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${FALCOSECURITY_LIBS_COMMON_FLAGS}") # we need also `-std=c++17` here beacuse `set(CMAKE_CXX_STANDARD 17)` is not enough to enforce c++17 # with some Cmake versions: https://github.com/falcosecurity/libs/pull/950 diff --git a/cmake/modules/libsinsp.cmake b/cmake/modules/libsinsp.cmake index 4c3e5101e1..815130de09 100644 --- a/cmake/modules/libsinsp.cmake +++ b/cmake/modules/libsinsp.cmake @@ -20,6 +20,7 @@ if(NOT LIBS_DIR) endif() option(USE_BUNDLED_DEPS "Enable bundled dependencies instead of using the system ones" ON) +option(ENABLE_THREAD_POOL "Enable inspector thread pool" OFF) if(DEFINED LIBSINSP_USER_AGENT) add_definitions(-DLIBSINSP_USER_AGENT="${LIBSINSP_USER_AGENT}") @@ -37,7 +38,10 @@ endif() include(jsoncpp) include(valijson) include(re2) -include(bs_threadpool) + +if(ENABLE_THREAD_POOL AND NOT EMSCRIPTEN) + include(bs_threadpool) +endif() set(LIBSINSP_INCLUDE_DIRS ${LIBS_DIR} ${LIBS_DIR}/userspace ${LIBSCAP_INCLUDE_DIRS} ${DRIVER_CONFIG_DIR}) @@ -55,8 +59,10 @@ list(APPEND LIBSINSP_INCLUDE_DIRS ${VALIJSON_ABSOLUTE_INCLUDE_DIR}) get_filename_component(RE2_ABSOLUTE_INCLUDE_DIR ${RE2_INCLUDE} ABSOLUTE) list(APPEND LIBSINSP_INCLUDE_DIRS ${RE2_ABSOLUTE_INCLUDE_DIR}) -get_filename_component(BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR ${BS_THREADPOOL_INCLUDE} ABSOLUTE) -list(APPEND LIBSINSP_INCLUDE_DIRS ${BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR}) +if (ENABLE_THREAD_POOL AND NOT EMSCRIPTEN) + get_filename_component(BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR ${BS_THREADPOOL_INCLUDE} ABSOLUTE) + list(APPEND LIBSINSP_INCLUDE_DIRS ${BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR}) +endif() if(NOT MINIMAL_BUILD AND NOT EMSCRIPTEN AND NOT APPLE) get_filename_component(CARES_ABSOLUTE_INCLUDE_DIR ${CARES_INCLUDE} ABSOLUTE) diff --git a/userspace/libsinsp/CMakeLists.txt b/userspace/libsinsp/CMakeLists.txt index 9159e68a34..f701a539a3 100644 --- a/userspace/libsinsp/CMakeLists.txt +++ b/userspace/libsinsp/CMakeLists.txt @@ -16,6 +16,7 @@ # option(USE_BUNDLED_DEPS "Enable bundled dependencies instead of using the system ones" ON) +option(ENABLE_THREAD_POOL "Enable inspector thread pool" OFF) if(NOT MSVC) add_definitions(-DHAVE_PWD_H) @@ -100,7 +101,6 @@ add_library(sinsp prefix_search.cpp sinsp_syslog.cpp threadinfo.cpp - thread_pool.cpp tuples.cpp sinsp.cpp token_bucket.cpp @@ -113,6 +113,13 @@ add_library(sinsp events/sinsp_events_ppm_sc.cpp ) +if (ENABLE_THREAD_POOL AND NOT EMSCRIPTEN) + target_sources(sinsp + PRIVATE + thread_pool_bs.cpp + ) +endif() + if(NOT WIN32 AND NOT APPLE) target_sources(sinsp PRIVATE procfs_utils.cpp sinsp_cgroup.cpp) endif() @@ -200,7 +207,7 @@ if(USE_BUNDLED_JSONCPP) add_dependencies(sinsp jsoncpp) endif() -if(USE_BUNDLED_BS_THREADPOOL) +if(ENABLE_THREAD_POOL AND USE_BUNDLED_BS_THREADPOOL) add_dependencies(sinsp bs_threadpool) endif() diff --git a/userspace/libsinsp/plugin.cpp b/userspace/libsinsp/plugin.cpp index 7eaf4d1faf..1a777b8c0a 100755 --- a/userspace/libsinsp/plugin.cpp +++ b/userspace/libsinsp/plugin.cpp @@ -826,7 +826,7 @@ thread_pool::routine_id_t sinsp_plugin::subscribe_routine(ss_plugin_routine_fn_t { if(!m_thread_pool) { - return static_cast(nullptr); + return reinterpret_cast(nullptr); } auto f = [this, routine_fn, routine_state]() -> bool { @@ -836,14 +836,14 @@ thread_pool::routine_id_t sinsp_plugin::subscribe_routine(ss_plugin_routine_fn_t return m_thread_pool->subscribe(f); } -void sinsp_plugin::unsubscribe_routine(thread_pool::routine_id_t routine_id) +bool sinsp_plugin::unsubscribe_routine(thread_pool::routine_id_t routine_id) { if(!m_thread_pool || !routine_id) { - return; + return false; } - m_thread_pool->unsubscribe(routine_id); + return m_thread_pool->unsubscribe(routine_id); } ss_plugin_routine_t* plugin_subscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_fn_t r, ss_plugin_routine_state_t* s) @@ -851,24 +851,29 @@ ss_plugin_routine_t* plugin_subscribe_routine(ss_plugin_owner_t* o, ss_plugin_ro auto t = static_cast(o); auto res = t->subscribe_routine(r, s); - return static_cast(res); + return reinterpret_cast(res); } -void plugin_unsubscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_t* r) +ss_plugin_rc plugin_unsubscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_t* r) { auto t = static_cast(o); - auto id = static_cast(r); + auto id = reinterpret_cast(r); - t->unsubscribe_routine(id); + return t->unsubscribe_routine(id) ? SS_PLUGIN_SUCCESS : SS_PLUGIN_FAILURE; } -void sinsp_plugin::capture_open() +bool sinsp_plugin::capture_open() { if(!m_inited) { throw sinsp_exception(std::string(s_not_init_err) + ": " + m_name); } + if(!m_handle->api.capture_open) + { + return false; + } + ss_plugin_routine_vtable routine_vtable; routine_vtable.subscribe = &plugin_subscribe_routine; routine_vtable.unsubscribe = &plugin_unsubscribe_routine; @@ -882,24 +887,26 @@ void sinsp_plugin::capture_open() in.owner = (ss_plugin_owner_t *) this; in.table_reader_ext = &table_reader_ext; in.table_writer_ext = &table_writer_ext; - in.routine = routine_vtable; + in.routine = &routine_vtable; sinsp_plugin::table_read_api(table_reader, table_reader_ext); sinsp_plugin::table_write_api(table_writer, table_writer_ext); - if(m_handle->api.capture_open) - { - m_handle->api.capture_open(m_state, &in); - } + return m_handle->api.capture_open(m_state, &in) == SS_PLUGIN_SUCCESS; } -void sinsp_plugin::capture_close() +bool sinsp_plugin::capture_close() { if(!m_inited) { throw sinsp_exception(std::string(s_not_init_err) + ": " + m_name); } + if(!m_handle->api.capture_close) + { + return false; + } + ss_plugin_routine_vtable routine_vtable; routine_vtable.subscribe = &plugin_subscribe_routine; routine_vtable.unsubscribe = &plugin_unsubscribe_routine; @@ -913,15 +920,12 @@ void sinsp_plugin::capture_close() in.owner = (ss_plugin_owner_t *) this; in.table_reader_ext = &table_reader_ext; in.table_writer_ext = &table_writer_ext; - in.routine = routine_vtable; + in.routine = &routine_vtable; sinsp_plugin::table_read_api(table_reader, table_reader_ext); sinsp_plugin::table_write_api(table_writer, table_writer_ext); - if(m_handle->api.capture_close) - { - m_handle->api.capture_close(m_state, &in); - } + return m_handle->api.capture_close(m_state, &in) == SS_PLUGIN_SUCCESS; } /** Event Source CAP **/ diff --git a/userspace/libsinsp/plugin.h b/userspace/libsinsp/plugin.h index 838151f853..9976b3f957 100755 --- a/userspace/libsinsp/plugin.h +++ b/userspace/libsinsp/plugin.h @@ -29,9 +29,14 @@ limitations under the License. #include #include #include -#include #include +#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__) +#include +#else +#include +#endif + /** * @brief An object-oriented representation of a plugin. */ @@ -172,10 +177,10 @@ class sinsp_plugin std::string get_init_schema(ss_plugin_schema_type& schema_type) const; bool set_config(const std::string& config); std::vector get_metrics() const; - void capture_open(); - void capture_close(); + bool capture_open(); + bool capture_close(); thread_pool::routine_id_t subscribe_routine(ss_plugin_routine_fn_t routine_fn, ss_plugin_routine_state_t* routine_state); - void unsubscribe_routine(thread_pool::routine_id_t routine_id); + bool unsubscribe_routine(thread_pool::routine_id_t routine_id); /** Event Sourcing **/ inline uint32_t id() const diff --git a/userspace/libsinsp/sinsp.cpp b/userspace/libsinsp/sinsp.cpp index e119da5b4b..b5ac697fbd 100644 --- a/userspace/libsinsp/sinsp.cpp +++ b/userspace/libsinsp/sinsp.cpp @@ -132,9 +132,12 @@ sinsp::sinsp(bool with_metrics) : m_table_registry = std::make_shared(); m_table_registry->add_table(m_thread_manager.get()); -#if !defined(__EMSCRIPTEN__) - m_thread_pool = std::make_shared(); +#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__) + m_thread_pool = std::make_shared(); +#else + m_thread_pool = nullptr; #endif + } sinsp::~sinsp() @@ -2167,3 +2170,19 @@ void sinsp::set_track_connection_status(bool enabled) m_parser->set_track_connection_status(enabled); } +std::shared_ptr sinsp::get_thread_pool() +{ + return m_thread_pool; +} + +bool sinsp::set_thread_pool(std::shared_ptr tpool) +{ + if(!m_thread_pool) + { + m_thread_pool = tpool; + return true; + } + + return false; +} + diff --git a/userspace/libsinsp/sinsp.h b/userspace/libsinsp/sinsp.h index 86752271bd..a6b1b9c808 100644 --- a/userspace/libsinsp/sinsp.h +++ b/userspace/libsinsp/sinsp.h @@ -1018,6 +1018,9 @@ class SINSP_PUBLIC sinsp : public capture_stats_source bool get_track_connection_status() const; inline void set_track_connection_status(bool enabled); + std::shared_ptr get_thread_pool(); + bool set_thread_pool(std::shared_ptr tpool); + /** * \brief Get a new timestamp. * @@ -1203,6 +1206,8 @@ class SINSP_PUBLIC sinsp : public capture_stats_source int32_t m_quantization_interval = -1; + std::shared_ptr m_thread_pool; + public: std::unique_ptr m_thread_manager; @@ -1404,10 +1409,6 @@ class SINSP_PUBLIC sinsp : public capture_stats_source // A registry that managers the state tables of this inspector std::shared_ptr m_table_registry; - // - // A thread pool capable of running non-blocking recurring routines - std::shared_ptr m_thread_pool; - sinsp_observer* m_observer{nullptr}; bool m_inited; diff --git a/userspace/libsinsp/test/plugins.ut.cpp b/userspace/libsinsp/test/plugins.ut.cpp index 01143a9458..8d95c52131 100644 --- a/userspace/libsinsp/test/plugins.ut.cpp +++ b/userspace/libsinsp/test/plugins.ut.cpp @@ -930,44 +930,46 @@ TEST_F(sinsp_with_test_input, plugin_metrics) #endif -#if !defined(__EMSCRIPTEN__) +#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__) TEST_F(sinsp_with_test_input, plugin_routines) { auto p = register_plugin(&m_inspector, get_plugin_api_sample_routines); open_inspector(); - ASSERT_NE(m_inspector.m_thread_pool, nullptr); + auto tp = m_inspector.get_thread_pool(); + + ASSERT_NE(tp, nullptr); // step #0: the plugins subscribes a routine on capture open - auto routines_num = m_inspector.m_thread_pool->routines_num(); + auto routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 1); // step #1: the plugin subscribes another routine add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); - routines_num = m_inspector.m_thread_pool->routines_num(); + routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 2); // step #2: the plugin unsubscribes the previous routine add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); - routines_num = m_inspector.m_thread_pool->routines_num(); + routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 1); // step #3: the plugin subscribes another routine add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); - routines_num = m_inspector.m_thread_pool->routines_num(); + routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 2); // step #4: the plugin sets a flag that causes the previous routine to be unsubscibed add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); std::this_thread::sleep_for(std::chrono::nanoseconds(1000)); //wait for a bit to let routine finish - routines_num = m_inspector.m_thread_pool->routines_num(); + routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 1); // step: #5: the plugin doesn't unsubscribe the last routine, but the thread pool shuould unsubscribe it on capture close m_inspector.close(); - std::this_thread::sleep_for(std::chrono::nanoseconds(100));; //wait for a bit to let routine finish - routines_num = m_inspector.m_thread_pool->routines_num(); + std::this_thread::sleep_for(std::chrono::nanoseconds(100)); //wait for a bit to let routine finish + routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 0); } diff --git a/userspace/libsinsp/test/plugins/routines.cpp b/userspace/libsinsp/test/plugins/routines.cpp index bb549c43b8..5a52480894 100644 --- a/userspace/libsinsp/test/plugins/routines.cpp +++ b/userspace/libsinsp/test/plugins/routines.cpp @@ -19,6 +19,9 @@ limitations under the License. #include "sample_table.h" #include "test_plugins.h" +#include +#include + struct plugin_state { std::string lasterr; @@ -26,7 +29,7 @@ struct plugin_state ss_plugin_routine_vtable routine_vtable; uint8_t step = 1; - bool flag = true; + std::atomic flag = true; ss_plugin_routine_t *routine; }; @@ -130,17 +133,20 @@ static ss_plugin_rc plugin_parse_event(ss_plugin_t *s, const ss_plugin_event_inp return SS_PLUGIN_SUCCESS; } -static void plugin_capture_open(ss_plugin_t* s, const ss_plugin_capture_listen_input* i) +static ss_plugin_rc plugin_capture_open(ss_plugin_t* s, const ss_plugin_capture_listen_input* i) { plugin_state *ps = (plugin_state *) s; - ps->routine_vtable = i->routine; + ps->routine_vtable.subscribe = i->routine->subscribe; + ps->routine_vtable.unsubscribe = i->routine->unsubscribe; ps->routine_vtable.subscribe(ps->owner, do_nothing, (ss_plugin_routine_state_t*)&ps->flag); + + return SS_PLUGIN_SUCCESS; } -static void plugin_capture_close(ss_plugin_t* s, const ss_plugin_capture_listen_input* i) +static ss_plugin_rc plugin_capture_close(ss_plugin_t* s, const ss_plugin_capture_listen_input* i) { - + return SS_PLUGIN_SUCCESS; } void get_plugin_api_sample_routines(plugin_api& out) diff --git a/userspace/libsinsp/test/thread_pool.ut.cpp b/userspace/libsinsp/test/thread_pool.ut.cpp index 7373943c6b..e0c78ee076 100644 --- a/userspace/libsinsp/test/thread_pool.ut.cpp +++ b/userspace/libsinsp/test/thread_pool.ut.cpp @@ -19,12 +19,12 @@ limitations under the License. #include #include -#if !defined(__EMSCRIPTEN__) +#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__) TEST_F(sinsp_with_test_input, thread_pool) { open_inspector(); - auto& tp = m_inspector.m_thread_pool; + auto tp = m_inspector.get_thread_pool(); ASSERT_NE(tp, nullptr); ASSERT_EQ(tp->routines_num(), 0); @@ -36,19 +36,26 @@ TEST_F(sinsp_with_test_input, thread_pool) }); // check if the routine has been subscribed - ASSERT_NE(r, nullptr); + ASSERT_NE(r, 0); ASSERT_EQ(tp->routines_num(), 1); // check if the routine has been unsubscribed - tp->unsubscribe(r); + auto res = tp->unsubscribe(r); ASSERT_EQ(tp->routines_num(), 0); + ASSERT_EQ(res, true); + + // unsuccessful unsubscribe + res = tp->unsubscribe(0); + ASSERT_EQ(res, false); // subscribe a routine that keeps running until a condition is met (returns false) - int count = 0; - r = tp->subscribe([&count] + std::atomic count = 0; + std::atomic routine_exited = false; + r = tp->subscribe([&count, &routine_exited] { if(count >= 1024) { + routine_exited = true; return false; } count++; @@ -57,8 +64,11 @@ TEST_F(sinsp_with_test_input, thread_pool) ASSERT_EQ(tp->routines_num(), 1); // the routine above keeps increasing a counter, until the counter reaches 1024 - // we give the routine enough time to finish, then we check if it has been unsubscribed - std::this_thread::sleep_for(std::chrono::seconds(1)); + // we wait for the routine to exit, then we check if it has been unsubscribed + while(!routine_exited) + { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } ASSERT_EQ(count, 1024); ASSERT_EQ(tp->routines_num(), 0); diff --git a/userspace/libsinsp/thread_pool.h b/userspace/libsinsp/thread_pool.h index 7c1556ba4d..1921e892f1 100644 --- a/userspace/libsinsp/thread_pool.h +++ b/userspace/libsinsp/thread_pool.h @@ -25,7 +25,7 @@ limitations under the License. class thread_pool { public: - using routine_id_t = std::function*; + using routine_id_t = uintptr_t; thread_pool() = default; @@ -47,7 +47,7 @@ class thread_pool * * \param id A routine handle. */ - virtual void unsubscribe(routine_id_t id) = 0; + virtual bool unsubscribe(routine_id_t id) = 0; /*! * \brief Unsubscribes all the subscribed routines and waits for the running ones to finish. @@ -58,35 +58,4 @@ class thread_pool * \return The count of currently subscribed routines. */ virtual size_t routines_num() = 0; -}; - -namespace BS { - class thread_pool; -}; - -class bs_thread_pool : public thread_pool -{ -public: - bs_thread_pool(size_t num_workers = 0); - - virtual ~bs_thread_pool() - { - purge(); - } - - thread_pool::routine_id_t subscribe(const std::function& func); - - void unsubscribe(thread_pool::routine_id_t id); - - void purge(); - - size_t routines_num(); - -private: - struct default_bs_tp_deleter { void operator()(BS::thread_pool* __ptr) const; }; - - void run_routine(std::shared_ptr> id); - - std::unique_ptr m_pool; - std::list>> m_routines; }; \ No newline at end of file diff --git a/userspace/libsinsp/thread_pool.cpp b/userspace/libsinsp/thread_pool_bs.cpp similarity index 66% rename from userspace/libsinsp/thread_pool.cpp rename to userspace/libsinsp/thread_pool_bs.cpp index 729c03b837..429f221099 100644 --- a/userspace/libsinsp/thread_pool.cpp +++ b/userspace/libsinsp/thread_pool_bs.cpp @@ -16,16 +16,16 @@ limitations under the License. */ -#include +#include #include -void bs_thread_pool::default_bs_tp_deleter::operator()(BS::thread_pool* __ptr) const +void thread_pool_bs::default_bs_tp_deleter::operator()(BS::thread_pool* __ptr) const { std::default_delete{}(__ptr); } -bs_thread_pool::bs_thread_pool(size_t num_workers): m_pool(nullptr), m_routines() +thread_pool_bs::thread_pool_bs(size_t num_workers): m_pool(nullptr), m_routines() { if (num_workers == 0) { @@ -37,24 +37,33 @@ bs_thread_pool::bs_thread_pool(size_t num_workers): m_pool(nullptr), m_routines( } } -bs_thread_pool::routine_id_t bs_thread_pool::subscribe(const std::function& func) +thread_pool_bs::routine_id_t thread_pool_bs::subscribe(const std::function& func) { m_routines.push_back(std::make_shared>(func)); auto& new_routine = m_routines.back(); run_routine(new_routine); - return static_cast(new_routine.get()); + return reinterpret_cast(new_routine.get()); } -void bs_thread_pool::unsubscribe(bs_thread_pool::routine_id_t id) +bool thread_pool_bs::unsubscribe(thread_pool_bs::routine_id_t id) { - m_routines.remove_if([id](const std::shared_ptr>& v) + bool removed = false; + m_routines.remove_if([id, &removed](const std::shared_ptr>& v) { - return v.get() == static_cast*>(id); + if(v.get() == reinterpret_cast*>(id)) + { + removed = true; + return true; + } + + return false; }); + + return removed; } -void bs_thread_pool::purge() +void thread_pool_bs::purge() { m_routines.clear(); @@ -62,12 +71,12 @@ void bs_thread_pool::purge() m_pool->wait(); } -size_t bs_thread_pool::routines_num() +size_t thread_pool_bs::routines_num() { return m_routines.size(); } -void bs_thread_pool::run_routine(std::shared_ptr> routine) +void thread_pool_bs::run_routine(std::shared_ptr> routine) { m_pool->detach_task([this, routine] { diff --git a/userspace/libsinsp/thread_pool_bs.h b/userspace/libsinsp/thread_pool_bs.h new file mode 100644 index 0000000000..1b9d82afb9 --- /dev/null +++ b/userspace/libsinsp/thread_pool_bs.h @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2024 The Falco Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +#include + +namespace BS { + class thread_pool; +}; + +class thread_pool_bs : public thread_pool +{ +public: + thread_pool_bs(size_t num_workers = 0); + + virtual ~thread_pool_bs() + { + purge(); + } + + thread_pool::routine_id_t subscribe(const std::function& func); + + bool unsubscribe(thread_pool::routine_id_t id); + + void purge(); + + size_t routines_num(); + +private: + struct default_bs_tp_deleter { void operator()(BS::thread_pool* __ptr) const; }; + + void run_routine(std::shared_ptr> id); + + std::unique_ptr m_pool; + std::list>> m_routines; +}; \ No newline at end of file diff --git a/userspace/plugin/plugin_api.h b/userspace/plugin/plugin_api.h index 14a373e78b..b173efcf1a 100644 --- a/userspace/plugin/plugin_api.h +++ b/userspace/plugin/plugin_api.h @@ -424,7 +424,9 @@ typedef struct // Arguments: // - o: the plugin's owner // - r: the routine's handle - void (*unsubscribe)(ss_plugin_owner_t* o, ss_plugin_routine_t* r); + // + // Return value: A ss_plugin_rc with values SS_PLUGIN_SUCCESS or SS_PLUGIN_FAILURE. + ss_plugin_rc (*unsubscribe)(ss_plugin_owner_t* o, ss_plugin_routine_t* r); } ss_plugin_routine_vtable; // Input passed to the plugin when the framework start and stops the capture. @@ -437,7 +439,7 @@ typedef struct ss_plugin_capture_listen_input // // Vtable containing callbacks that can be used by the plugin // for subscribing and unsubscribing routines to the framework's thread pool. - ss_plugin_routine_vtable routine; + ss_plugin_routine_vtable* routine; // // Vtable for controlling a state table for read operations. ss_plugin_table_reader_vtable_ext* table_reader_ext; @@ -1048,7 +1050,9 @@ typedef struct // Arguments: // - s: the plugin state, returned by init(). Can be NULL. // - i: input containing vtables for performing table operations and subscribe/unsubscribe async routines - void (*capture_open)(ss_plugin_t* s, const ss_plugin_capture_listen_input* i); + // + // Return value: A ss_plugin_rc with values SS_PLUGIN_SUCCESS or SS_PLUGIN_FAILURE. + ss_plugin_rc (*capture_open)(ss_plugin_t* s, const ss_plugin_capture_listen_input* i); // // Called by the framework when the event capture closes. @@ -1057,7 +1061,9 @@ typedef struct // Arguments: // - s: the plugin state, returned by init(). Can be NULL. // - i: input containing vtables for performing table operations and subscribe/unsubscribe async routines - void (*capture_close)(ss_plugin_t* s, const ss_plugin_capture_listen_input* i); + // + // Return value: A ss_plugin_rc with values SS_PLUGIN_SUCCESS or SS_PLUGIN_FAILURE. + ss_plugin_rc (*capture_close)(ss_plugin_t* s, const ss_plugin_capture_listen_input* i); }; } plugin_api;