Skip to content

Commit

Permalink
fix(libsinsp): address reviewer thread pool suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: Gianmatteo Palmieri <[email protected]>
Co-authored-by: Jason Dellaluce <[email protected]>
  • Loading branch information
2 people authored and poiana committed Sep 5, 2024
1 parent e0ec81c commit 8f6f9df
Show file tree
Hide file tree
Showing 15 changed files with 208 additions and 110 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
UBSAN_OPTIONS: print_stacktrace=1
run: |
mkdir -p build
cd build && cmake ${{ matrix.cmake_opts }} ../
cd build && cmake ${{ matrix.cmake_opts }} -DENABLE_THREAD_POOL=ON ../
KERNELDIR=/lib/modules/$(ls /lib/modules)/build make -j4
make run-unit-tests
Expand All @@ -90,7 +90,7 @@ jobs:
- name: Build and test 🏗️🧪
run: |
mkdir -p build
cd build && cmake -DBUILD_BPF=On -DBUILD_DRIVER=Off -DUSE_BUNDLED_DEPS=On -DUSE_BUNDLED_LIBELF=Off -DUSE_SHARED_LIBELF=Off -DBUILD_LIBSCAP_MODERN_BPF=ON -DMUSL_OPTIMIZED_BUILD=On ../
cd build && cmake -DBUILD_BPF=On -DBUILD_DRIVER=Off -DUSE_BUNDLED_DEPS=On -DUSE_BUNDLED_LIBELF=Off -DUSE_SHARED_LIBELF=Off -DBUILD_LIBSCAP_MODERN_BPF=ON -DMUSL_OPTIMIZED_BUILD=On -DENABLE_THREAD_POOL=ON ../
make run-unit-tests -j4
build-shared-libs-linux-amd64:
Expand All @@ -115,7 +115,7 @@ jobs:
- name: Build and test 🏗️🧪
run: |
mkdir -p build
cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DMINIMAL_BUILD=True -DCMAKE_INSTALL_PREFIX=/tmp/libs-test ../
cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DMINIMAL_BUILD=True -DCMAKE_INSTALL_PREFIX=/tmp/libs-test -DENABLE_THREAD_POOL=ON ../
make -j4
make run-unit-tests
Expand Down Expand Up @@ -155,7 +155,7 @@ jobs:
- name: Build and test 🏗️🧪
run: |
mkdir -p build
cd build && cmake -DUSE_BUNDLED_DEPS=ON -DCMAKE_BUILD_TYPE=Release -DCMAKE_MSVC_RUNTIME_LIBRARY=${{ matrix.crt }} -DCREATE_TEST_TARGETS=ON -DMINIMAL_BUILD=ON ..
cd build && cmake -DUSE_BUNDLED_DEPS=ON -DCMAKE_BUILD_TYPE=Release -DCMAKE_MSVC_RUNTIME_LIBRARY=${{ matrix.crt }} -DCREATE_TEST_TARGETS=ON -DMINIMAL_BUILD=ON -DENABLE_THREAD_POOL=ON ..
cmake --build . --config Release --parallel 4 && make run-unit-tests || libsinsp\test\Release\unit-test-libsinsp.exe
build-shared-libs-macos-amd64:
Expand All @@ -174,7 +174,7 @@ jobs:
- name: Build 🏗️
run: |
mkdir -p build
cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DUSE_BUNDLED_VALIJSON=ON -DUSE_BUNDLED_BS_THREADPOOL=ON -DCMAKE_BUILD_TYPE=Release -DCREATE_TEST_TARGETS=OFF -DMINIMAL_BUILD=ON -DCMAKE_INSTALL_PREFIX=/tmp/libs-test ..
cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DUSE_BUNDLED_VALIJSON=ON -DUSE_BUNDLED_BS_THREADPOOL=ON -DENABLE_THREAD_POOL=ON -DCMAKE_BUILD_TYPE=Release -DCREATE_TEST_TARGETS=OFF -DMINIMAL_BUILD=ON -DCMAKE_INSTALL_PREFIX=/tmp/libs-test ..
cmake --build . --config Release --parallel $(getconf _NPROCESSORS_ONLN)
- name: Install
Expand Down
4 changes: 4 additions & 0 deletions cmake/modules/CompilerFlags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ if(NOT MSVC)
set(FALCOSECURITY_LIBS_USERSPACE_LINK_FLAGS "${FALCOSECURITY_LIBS_USERSPACE_COMPILE_FLAGS};--coverage")
endif()

if(ENABLE_THREAD_POOL)
set(FALCOSECURITY_LIBS_COMMON_FLAGS "${FALCOSECURITY_LIBS_COMMON_FLAGS} -DENABLE_THREAD_POOL")
endif()

set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${FALCOSECURITY_LIBS_COMMON_FLAGS}")
# we need also `-std=c++17` here beacuse `set(CMAKE_CXX_STANDARD 17)` is not enough to enforce c++17
# with some Cmake versions: https://github.com/falcosecurity/libs/pull/950
Expand Down
12 changes: 9 additions & 3 deletions cmake/modules/libsinsp.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ if(NOT LIBS_DIR)
endif()

option(USE_BUNDLED_DEPS "Enable bundled dependencies instead of using the system ones" ON)
option(ENABLE_THREAD_POOL "Enable inspector thread pool" OFF)

if(DEFINED LIBSINSP_USER_AGENT)
add_definitions(-DLIBSINSP_USER_AGENT="${LIBSINSP_USER_AGENT}")
Expand All @@ -37,7 +38,10 @@ endif()
include(jsoncpp)
include(valijson)
include(re2)
include(bs_threadpool)

if(ENABLE_THREAD_POOL AND NOT EMSCRIPTEN)
include(bs_threadpool)
endif()

set(LIBSINSP_INCLUDE_DIRS ${LIBS_DIR} ${LIBS_DIR}/userspace ${LIBSCAP_INCLUDE_DIRS} ${DRIVER_CONFIG_DIR})

Expand All @@ -55,8 +59,10 @@ list(APPEND LIBSINSP_INCLUDE_DIRS ${VALIJSON_ABSOLUTE_INCLUDE_DIR})
get_filename_component(RE2_ABSOLUTE_INCLUDE_DIR ${RE2_INCLUDE} ABSOLUTE)
list(APPEND LIBSINSP_INCLUDE_DIRS ${RE2_ABSOLUTE_INCLUDE_DIR})

get_filename_component(BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR ${BS_THREADPOOL_INCLUDE} ABSOLUTE)
list(APPEND LIBSINSP_INCLUDE_DIRS ${BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR})
if (ENABLE_THREAD_POOL AND NOT EMSCRIPTEN)
get_filename_component(BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR ${BS_THREADPOOL_INCLUDE} ABSOLUTE)
list(APPEND LIBSINSP_INCLUDE_DIRS ${BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR})
endif()

if(NOT MINIMAL_BUILD AND NOT EMSCRIPTEN AND NOT APPLE)
get_filename_component(CARES_ABSOLUTE_INCLUDE_DIR ${CARES_INCLUDE} ABSOLUTE)
Expand Down
11 changes: 9 additions & 2 deletions userspace/libsinsp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

option(USE_BUNDLED_DEPS "Enable bundled dependencies instead of using the system ones" ON)
option(ENABLE_THREAD_POOL "Enable inspector thread pool" OFF)

if(NOT MSVC)
add_definitions(-DHAVE_PWD_H)
Expand Down Expand Up @@ -100,7 +101,6 @@ add_library(sinsp
prefix_search.cpp
sinsp_syslog.cpp
threadinfo.cpp
thread_pool.cpp
tuples.cpp
sinsp.cpp
token_bucket.cpp
Expand All @@ -113,6 +113,13 @@ add_library(sinsp
events/sinsp_events_ppm_sc.cpp
)

if (ENABLE_THREAD_POOL AND NOT EMSCRIPTEN)
target_sources(sinsp
PRIVATE
thread_pool_bs.cpp
)
endif()

if(NOT WIN32 AND NOT APPLE)
target_sources(sinsp PRIVATE procfs_utils.cpp sinsp_cgroup.cpp)
endif()
Expand Down Expand Up @@ -200,7 +207,7 @@ if(USE_BUNDLED_JSONCPP)
add_dependencies(sinsp jsoncpp)
endif()

if(USE_BUNDLED_BS_THREADPOOL)
if(ENABLE_THREAD_POOL AND USE_BUNDLED_BS_THREADPOOL)
add_dependencies(sinsp bs_threadpool)
endif()

Expand Down
44 changes: 24 additions & 20 deletions userspace/libsinsp/plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ thread_pool::routine_id_t sinsp_plugin::subscribe_routine(ss_plugin_routine_fn_t
{
if(!m_thread_pool)
{
return static_cast<thread_pool::routine_id_t>(nullptr);
return reinterpret_cast<thread_pool::routine_id_t>(nullptr);
}

auto f = [this, routine_fn, routine_state]() -> bool {
Expand All @@ -836,39 +836,44 @@ thread_pool::routine_id_t sinsp_plugin::subscribe_routine(ss_plugin_routine_fn_t
return m_thread_pool->subscribe(f);
}

void sinsp_plugin::unsubscribe_routine(thread_pool::routine_id_t routine_id)
bool sinsp_plugin::unsubscribe_routine(thread_pool::routine_id_t routine_id)
{
if(!m_thread_pool || !routine_id)
{
return;
return false;
}

m_thread_pool->unsubscribe(routine_id);
return m_thread_pool->unsubscribe(routine_id);
}

ss_plugin_routine_t* plugin_subscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_fn_t r, ss_plugin_routine_state_t* s)
{
auto t = static_cast<sinsp_plugin*>(o);
auto res = t->subscribe_routine(r, s);

return static_cast<ss_plugin_routine_t*>(res);
return reinterpret_cast<ss_plugin_routine_t*>(res);
}

void plugin_unsubscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_t* r)
ss_plugin_rc plugin_unsubscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_t* r)
{
auto t = static_cast<sinsp_plugin*>(o);
auto id = static_cast<thread_pool::routine_id_t>(r);
auto id = reinterpret_cast<thread_pool::routine_id_t>(r);

t->unsubscribe_routine(id);
return t->unsubscribe_routine(id) ? SS_PLUGIN_SUCCESS : SS_PLUGIN_FAILURE;
}

void sinsp_plugin::capture_open()
bool sinsp_plugin::capture_open()
{
if(!m_inited)
{
throw sinsp_exception(std::string(s_not_init_err) + ": " + m_name);
}

if(!m_handle->api.capture_open)
{
return false;
}

ss_plugin_routine_vtable routine_vtable;
routine_vtable.subscribe = &plugin_subscribe_routine;
routine_vtable.unsubscribe = &plugin_unsubscribe_routine;
Expand All @@ -882,24 +887,26 @@ void sinsp_plugin::capture_open()
in.owner = (ss_plugin_owner_t *) this;
in.table_reader_ext = &table_reader_ext;
in.table_writer_ext = &table_writer_ext;
in.routine = routine_vtable;
in.routine = &routine_vtable;

sinsp_plugin::table_read_api(table_reader, table_reader_ext);
sinsp_plugin::table_write_api(table_writer, table_writer_ext);

if(m_handle->api.capture_open)
{
m_handle->api.capture_open(m_state, &in);
}
return m_handle->api.capture_open(m_state, &in) == SS_PLUGIN_SUCCESS;
}

void sinsp_plugin::capture_close()
bool sinsp_plugin::capture_close()
{
if(!m_inited)
{
throw sinsp_exception(std::string(s_not_init_err) + ": " + m_name);
}

if(!m_handle->api.capture_close)
{
return false;
}

ss_plugin_routine_vtable routine_vtable;
routine_vtable.subscribe = &plugin_subscribe_routine;
routine_vtable.unsubscribe = &plugin_unsubscribe_routine;
Expand All @@ -913,15 +920,12 @@ void sinsp_plugin::capture_close()
in.owner = (ss_plugin_owner_t *) this;
in.table_reader_ext = &table_reader_ext;
in.table_writer_ext = &table_writer_ext;
in.routine = routine_vtable;
in.routine = &routine_vtable;

sinsp_plugin::table_read_api(table_reader, table_reader_ext);
sinsp_plugin::table_write_api(table_writer, table_writer_ext);

if(m_handle->api.capture_close)
{
m_handle->api.capture_close(m_state, &in);
}
return m_handle->api.capture_close(m_state, &in) == SS_PLUGIN_SUCCESS;
}

/** Event Source CAP **/
Expand Down
13 changes: 9 additions & 4 deletions userspace/libsinsp/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ limitations under the License.
#include <libsinsp/version.h>
#include <libsinsp/events/sinsp_events.h>
#include <libsinsp/state/table_registry.h>
#include <libsinsp/thread_pool.h>
#include <plugin/plugin_loader.h>

#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__)
#include <libsinsp/thread_pool_bs.h>
#else
#include <libsinsp/thread_pool.h>
#endif

/**
* @brief An object-oriented representation of a plugin.
*/
Expand Down Expand Up @@ -172,10 +177,10 @@ class sinsp_plugin
std::string get_init_schema(ss_plugin_schema_type& schema_type) const;
bool set_config(const std::string& config);
std::vector<metrics_v2> get_metrics() const;
void capture_open();
void capture_close();
bool capture_open();
bool capture_close();
thread_pool::routine_id_t subscribe_routine(ss_plugin_routine_fn_t routine_fn, ss_plugin_routine_state_t* routine_state);
void unsubscribe_routine(thread_pool::routine_id_t routine_id);
bool unsubscribe_routine(thread_pool::routine_id_t routine_id);

/** Event Sourcing **/
inline uint32_t id() const
Expand Down
23 changes: 21 additions & 2 deletions userspace/libsinsp/sinsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,12 @@ sinsp::sinsp(bool with_metrics) :
m_table_registry = std::make_shared<libsinsp::state::table_registry>();
m_table_registry->add_table(m_thread_manager.get());

#if !defined(__EMSCRIPTEN__)
m_thread_pool = std::make_shared<bs_thread_pool>();
#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__)
m_thread_pool = std::make_shared<thread_pool_bs>();
#else
m_thread_pool = nullptr;
#endif

}

sinsp::~sinsp()
Expand Down Expand Up @@ -2167,3 +2170,19 @@ void sinsp::set_track_connection_status(bool enabled)
m_parser->set_track_connection_status(enabled);
}

std::shared_ptr<thread_pool> sinsp::get_thread_pool()
{
return m_thread_pool;
}

bool sinsp::set_thread_pool(std::shared_ptr<thread_pool> tpool)
{
if(!m_thread_pool)
{
m_thread_pool = tpool;
return true;
}

return false;
}

9 changes: 5 additions & 4 deletions userspace/libsinsp/sinsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,9 @@ class SINSP_PUBLIC sinsp : public capture_stats_source
bool get_track_connection_status() const;
inline void set_track_connection_status(bool enabled);

std::shared_ptr<thread_pool> get_thread_pool();
bool set_thread_pool(std::shared_ptr<thread_pool> tpool);

/**
* \brief Get a new timestamp.
*
Expand Down Expand Up @@ -1203,6 +1206,8 @@ class SINSP_PUBLIC sinsp : public capture_stats_source

int32_t m_quantization_interval = -1;

std::shared_ptr<thread_pool> m_thread_pool;

public:
std::unique_ptr<sinsp_thread_manager> m_thread_manager;

Expand Down Expand Up @@ -1404,10 +1409,6 @@ class SINSP_PUBLIC sinsp : public capture_stats_source
// A registry that managers the state tables of this inspector
std::shared_ptr<libsinsp::state::table_registry> m_table_registry;

//
// A thread pool capable of running non-blocking recurring routines
std::shared_ptr<thread_pool> m_thread_pool;

sinsp_observer* m_observer{nullptr};

bool m_inited;
Expand Down
20 changes: 11 additions & 9 deletions userspace/libsinsp/test/plugins.ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -930,44 +930,46 @@ TEST_F(sinsp_with_test_input, plugin_metrics)

#endif

#if !defined(__EMSCRIPTEN__)
#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__)

TEST_F(sinsp_with_test_input, plugin_routines)
{
auto p = register_plugin(&m_inspector, get_plugin_api_sample_routines);
open_inspector();

ASSERT_NE(m_inspector.m_thread_pool, nullptr);
auto tp = m_inspector.get_thread_pool();

ASSERT_NE(tp, nullptr);

// step #0: the plugins subscribes a routine on capture open
auto routines_num = m_inspector.m_thread_pool->routines_num();
auto routines_num = tp->routines_num();
ASSERT_EQ(routines_num, 1);

// step #1: the plugin subscribes another routine
add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0);
routines_num = m_inspector.m_thread_pool->routines_num();
routines_num = tp->routines_num();
ASSERT_EQ(routines_num, 2);

// step #2: the plugin unsubscribes the previous routine
add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0);
routines_num = m_inspector.m_thread_pool->routines_num();
routines_num = tp->routines_num();
ASSERT_EQ(routines_num, 1);

// step #3: the plugin subscribes another routine
add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0);
routines_num = m_inspector.m_thread_pool->routines_num();
routines_num = tp->routines_num();
ASSERT_EQ(routines_num, 2);

// step #4: the plugin sets a flag that causes the previous routine to be unsubscibed
add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0);
std::this_thread::sleep_for(std::chrono::nanoseconds(1000)); //wait for a bit to let routine finish
routines_num = m_inspector.m_thread_pool->routines_num();
routines_num = tp->routines_num();
ASSERT_EQ(routines_num, 1);

// step: #5: the plugin doesn't unsubscribe the last routine, but the thread pool shuould unsubscribe it on capture close
m_inspector.close();
std::this_thread::sleep_for(std::chrono::nanoseconds(100));; //wait for a bit to let routine finish
routines_num = m_inspector.m_thread_pool->routines_num();
std::this_thread::sleep_for(std::chrono::nanoseconds(100)); //wait for a bit to let routine finish
routines_num = tp->routines_num();
ASSERT_EQ(routines_num, 0);
}

Expand Down
Loading

0 comments on commit 8f6f9df

Please sign in to comment.