-
Notifications
You must be signed in to change notification settings - Fork 0
Introduce Multiple I/O Threads for Coordinator #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 6313_base
Are you sure you want to change the base?
Conversation
* wip * pop from stash * pop from stash * small changes and comments * added some more comments: WIP * partial commit * some progress * code compiles * steps forward * refactor advanced * fix const correctness * fix tests and properly set configuration * some code improvement * apply review by copilot * fix compilation issue in linux * proper handling of I/O thread shutdown * fix unit test * handle uv_init in its thread * start I/O threads at start * fix problem with io callback * some cluster tests passing * improve handling of Topology updates * properly initialize IORuntime * fix change connection tests for now * small clean * use minValue in config test * small fix * remove duplicated call * each IORuntime gets its own topology * fixing the tests that could not work because of non-lazyness * fix compilation error * fix initialization problems of uv objects * fix problem uvReplyClusterInfo * fix race condition async init * init timer lazily * small edit in logging * schedule check regardless of thread running * proper initialization of IORuntime * some synchronization work for io thread joining * fix coord unit tests * fix loop thread joining * add unit tests for IORuntimeCtx * improve handling of libUV objects * properly initialize rr counter * fix give more time to thread test specially for sanitizer cases * handle new test * small comment * readd code removed by mistake * fix test * option to dynamically set IO threads * add config test * fix test sanitize * get rid of the control plane runtime explicit handling * fix leak * fix some test * fix leak of topology * set uv thread name * abstract the UVRuntime into an ADT * bring back default config * review naming and computation of conn_pool_size * remove unused member * refactor connPoolSize recomputation * better conn stop handling * add specific logs to follow error * Revert "add specific logs to follow error" This reverts commit 9d84dbb. * fix make IO threads config static * fix leak * fix test * fix access not null * set default coodr value to 1 * add tests with different io threads * add ft aggregate test * Apply suggestions from code review Co-authored-by: GuyAv46 <[email protected]> * change namew MRCluster Free * inline conn manager in Io runtime ctx * refactor conn not passing loop everywhere * cluster_g safe access * do not depend on num workers for io thread setting * remove unrequired pending mutex protection * set max search io threads and remove unneeded atomic op * fix the pending decrease of RQ --------- Co-authored-by: DvirDukhan <[email protected]> Co-authored-by: GuyAv46 <[email protected]>
| run_cpp_coord_tests() { | ||
| print_separator | ||
| echo "# Running C++ coordinator unit tests" | ||
| CPP_COORD_TESTS_DIR="$(cd $BINDIR/tests/cpptests/coord_tests; pwd)" | ||
| cd $ROOT_DIR/tests/cpptests/coord_tests | ||
| TEST_NAME=rstest_coord setup_sanitizer | ||
|
|
||
| if [[ -z $TEST ]]; then | ||
| # Run all C++ coordinator tests | ||
| LOG_FILE="${LOGS_DIR}/rstest_coord.log" | ||
| echo "Running all C++ coordinator tests (log: $LOG_FILE)" | ||
| { $CPP_COORD_TESTS_DIR/rstest_coord > "$LOG_FILE" 2>&1; test_result=$?; (( EXIT_CODE |= $test_result )); } || true | ||
|
|
||
| # Parse and display individual test results | ||
| parse_cpp_test_results "$LOG_FILE" | ||
| else | ||
| # Run single C++ coordinator test if requested | ||
| if [[ -f $CPP_COORD_TESTS_DIR/rstest_coord ]]; then | ||
| LOG_FILE="${LOGS_DIR}/rstest_coord_${TEST}.log" | ||
| echo "Running C++ coordinator test: $TEST (log: $LOG_FILE)" | ||
| { $CPP_COORD_TESTS_DIR/rstest_coord --gtest_filter=$TEST > "$LOG_FILE" 2>&1; test_result=$?; (( EXIT_CODE |= $test_result )); } || true | ||
|
|
||
| # Parse and display results | ||
| parse_cpp_test_results "$LOG_FILE" | ||
| else | ||
| echo "C++ coordinator test binary not found: $CPP_COORD_TESTS_DIR/rstest_coord" | ||
| fi | ||
| fi | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The new function run_cpp_coord_tests is almost identical to the existing run_cpp_tests function. This duplication can be avoided by creating a generic function that takes parameters like the test binary name, directories, and a description. This would make the script more maintainable and easier to extend with new test suites in the future.
For example, you could create a generic function like this:
run_gtest_suite() {
local test_binary_name=$1
local test_binary_dir_rel=$2
local test_source_dir_rel=$3
local description=$4
# ... implementation ...
}And then call it for both test suites:
run_cpp_tests() {
run_gtest_suite "rstest" "tests/cpptests" "tests/cpptests" "C++ unit tests"
}
run_cpp_coord_tests() {
run_gtest_suite "rstest_coord" "tests/cpptests/coord_tests" "tests/cpptests/coord_tests" "C++ coordinator unit tests"
}| // To be called from the event loop thread, after the request is done, no need to protect the pending | ||
| void RQ_Done(MRWorkQueue *q) { | ||
| uv_mutex_lock(&q->lock); | ||
| --q->pending; | ||
| uv_mutex_unlock(&q->lock); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
The pending counter in MRWorkQueue appears to be accessed from multiple threads without synchronization, which could lead to a race condition.
RQ_Pop()incrementspendingand is called from the libuv event loop thread.RQ_Done()decrementspendingand can be called from the main Redis thread (viaunblockHandler) or a worker thread.
The decrement --q->pending is not atomic. This could lead to incorrect pending counts, potentially causing the queue to process requests incorrectly under load. The comment on line 77 also appears to be incorrect based on the call paths.
I see from the commit history that you've worked on this area. Could you clarify if my understanding of the threading model is incorrect, or if an atomic operation is needed here?
| // To be called from the event loop thread, after the request is done, no need to protect the pending | |
| void RQ_Done(MRWorkQueue *q) { | |
| uv_mutex_lock(&q->lock); | |
| --q->pending; | |
| uv_mutex_unlock(&q->lock); | |
| } | |
| // This can be called from multiple threads, must be atomic. | |
| void RQ_Done(MRWorkQueue *q) { | |
| __atomic_fetch_sub(&q->pending, 1, __ATOMIC_RELAXED); | |
| } |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Multiple I/O uvloop threads for Coordinator
This pull request introduces a significant architectural change to the coordinator component by implementing a multi-threaded I/O model. Previously, the coordinator likely relied on a single I/O thread for all cluster communication. The core change involves abstracting the I/O runtime into
IORuntimeCtxobjects, each managing its ownlibuvevent loop, request queue, and connection manager. The mainMRClusternow maintains a pool of theseIORuntimeCtxinstances, distributing requests among them using a round-robin mechanism.The primary outcome of this change is enhanced scalability and performance for cluster operations by enabling parallel I/O. It aims to improve responsiveness and throughput, especially in high-concurrency scenarios. The refactoring also leads to better isolation of I/O operations and a more robust framework for handling dynamic topology updates and connection management across multiple threads.
Key Changes
• Introduced
IORuntimeCtxstructure to encapsulatelibuvevent loop, request queue (MRWorkQueue), and connection manager (MRConnManager) for each I/O thread.• The
MRClusternow manages a pool ofIORuntimeCtxinstances, distributing incoming requests (e.g.,MR_Fanout,MR_MapSingle) across them using a round-robin strategy.• Refactored core cluster communication functions (
MRCluster_SendCommand,MRCluster_FanoutCommand,MRCluster_GetConn) to operate on a specificIORuntimeCtxinstance, rather than a single global connection manager.• Implemented thread-safe mechanisms for
IORuntimeCtxcreation, starting, and freeing, including synchronization primitives (uv_mutex_t,uv_cond_t) to manage thread lifecycle.• Modified topology update logic (
MR_UpdateTopology) to propagate new cluster topologies to allIORuntimeCtxinstances, with each instance maintaining its own copy of the topology.• Updated connection pool resizing (
MR_UpdateConnPoolSize) to dynamically adjust connections perIORuntimeCtxbased on the newSEARCH_IO_THREADSconfiguration.• Added a new configuration option
SEARCH_IO_THREADSto control the number of I/O threads used by the coordinator.• Enhanced shutdown procedures for
IORuntimeCtxto ensure proper cleanup oflibuvhandles and graceful thread joining.• Refactored
RQ(Request Queue) to be an integral part ofIORuntimeCtx, handling request pushing and popping in a thread-safe manner.• Introduced new C++ unit tests (
test_cpp_io_runtime_ctx`.cpp`,test_cpp_cluster_io_threads.cpp) to validate the behavior and thread safety of the new I/O runtime context and multi-threaded cluster operations.• Updated Python integration tests (
test_multithread`.py`, `test.py`,test_config.py) to account for the new multi-threaded I/O model andSEARCH_IO_THREADSconfiguration.Affected Areas
• src/coord/rmr/
io_runtime_ctx.c• src/coord/rmr/
io_runtime_ctx.h• src/coord/rmr/cluster.c
• src/coord/rmr/cluster.h
• src/coord/rmr/rmr.c
• src/coord/rmr/rmr.h
• src/coord/rmr/rq.c
• src/coord/rmr/rq.h
• src/coord/rmr/conn.c
• src/coord/rmr/conn.h
• src/coord/rmr/
cluster_topology.c• src/coord/rmr/
cluster_topology.h• src/coord/config.c
• src/coord/config.h
• src/module.c
• src/coord/
debug_commands.c• tests/pytests/
test_multithread.py• tests/pytests/test.py
• tests/pytests/
test_config.py• tests/cpptests/
coord_tests/test_cpp_io_runtime_ctx.cpp• tests/cpptests/
coord_tests/test_cpp_cluster_io_threads.cpp• tests/cpptests/
coord_tests/CMakeLists.txt• tests/ctests/
coord_tests/test_cluster.c• tests/ctests/
coord_tests/CMakeLists.txt• tests/cpptests/
CMakeLists.txt• sbin/unit-tests
This summary was automatically generated by @propel-code-bot