Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion sbin/unit-tests
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,39 @@ run_coordinator_tests() {
done
}

#------------------------------------------------------------------------------
# Run C++ coordinator unit tests
#------------------------------------------------------------------------------
run_cpp_coord_tests() {
print_separator
echo "# Running C++ coordinator unit tests"
CPP_COORD_TESTS_DIR="$(cd $BINDIR/tests/cpptests/coord_tests; pwd)"
cd $ROOT_DIR/tests/cpptests/coord_tests
TEST_NAME=rstest_coord setup_sanitizer

if [[ -z $TEST ]]; then
# Run all C++ coordinator tests
LOG_FILE="${LOGS_DIR}/rstest_coord.log"
echo "Running all C++ coordinator tests (log: $LOG_FILE)"
{ $CPP_COORD_TESTS_DIR/rstest_coord > "$LOG_FILE" 2>&1; test_result=$?; (( EXIT_CODE |= $test_result )); } || true

# Parse and display individual test results
parse_cpp_test_results "$LOG_FILE"
else
# Run single C++ coordinator test if requested
if [[ -f $CPP_COORD_TESTS_DIR/rstest_coord ]]; then
LOG_FILE="${LOGS_DIR}/rstest_coord_${TEST}.log"
echo "Running C++ coordinator test: $TEST (log: $LOG_FILE)"
{ $CPP_COORD_TESTS_DIR/rstest_coord --gtest_filter=$TEST > "$LOG_FILE" 2>&1; test_result=$?; (( EXIT_CODE |= $test_result )); } || true

# Parse and display results
parse_cpp_test_results "$LOG_FILE"
else
echo "C++ coordinator test binary not found: $CPP_COORD_TESTS_DIR/rstest_coord"
fi
fi
}
Comment on lines +215 to +243

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The new function run_cpp_coord_tests is almost identical to the existing run_cpp_tests function. This duplication can be avoided by creating a generic function that takes parameters like the test binary name, directories, and a description. This would make the script more maintainable and easier to extend with new test suites in the future.

For example, you could create a generic function like this:

run_gtest_suite() {
    local test_binary_name=$1
    local test_binary_dir_rel=$2
    local test_source_dir_rel=$3
    local description=$4
    # ... implementation ...
}

And then call it for both test suites:

run_cpp_tests() {
    run_gtest_suite "rstest" "tests/cpptests" "tests/cpptests" "C++ unit tests"
}

run_cpp_coord_tests() {
    run_gtest_suite "rstest_coord" "tests/cpptests/coord_tests" "tests/cpptests/coord_tests" "C++ coordinator unit tests"
}
Copy Context


#------------------------------------------------------------------------------
# Run all unit tests
#------------------------------------------------------------------------------
Expand All @@ -219,8 +252,11 @@ run_all_tests() {
# Run C++ tests
run_cpp_tests

# Run coordinator tests
# Run C coordinator tests
run_coordinator_tests

# Run C++ coordinator tests
run_cpp_coord_tests
}

#------------------------------------------------------------------------------
Expand Down
54 changes: 51 additions & 3 deletions src/coord/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
#include "hiredis/hiredis.h"
#include "module.h"


#include <string.h>
#include <stdlib.h>

extern RedisModuleCtx *RSDummyContext;

#define CEIL_DIV(a, b) ((a + b - 1) / b)
#define MAX_SEARCH_IO_THREADS (1 << 8)
#define CONFIG_FROM_RSCONFIG(c) ((SearchClusterConfig *)(c)->chainedConfig)

static SearchClusterConfig* getOrCreateRealConfig(RSConfig *config){
Expand Down Expand Up @@ -80,7 +81,10 @@ int triggerConnPerShard(RSConfig *config) {
} else {
connPerShard = config->numWorkerThreads + 1;
}
MR_UpdateConnPerShard(connPerShard);
// The connPerShard will be applied to each of the ConnManager in each of the IO threads.
size_t conn_pool_size = CEIL_DIV(connPerShard, realConfig->coordinatorIOThreads);

MR_UpdateConnPoolSize(conn_pool_size);
return REDISMODULE_OK;
}

Expand Down Expand Up @@ -152,7 +156,7 @@ CONFIG_GETTER(getSearchThreads) {

// search-threads
int set_search_threads(const char *name, long long val, void *privdata,
RedisModuleString **err) {
RedisModuleString **err) {
RSConfig *config = (RSConfig *)privdata;
SearchClusterConfig *realConfig = getOrCreateRealConfig(config);
realConfig->coordinatorPoolSize = (size_t)val;
Expand All @@ -165,6 +169,36 @@ long long get_search_threads(const char *name, void *privdata) {
return (long long)realConfig->coordinatorPoolSize;
}

// SEARCH_IO_THREADS

CONFIG_SETTER(setSearchIOThreads) {
SearchClusterConfig *realConfig = getOrCreateRealConfig((RSConfig *)config);
int acrc = AC_GetSize(ac, &realConfig->coordinatorIOThreads, AC_F_GE1);
CHECK_RETURN_PARSE_ERROR(acrc);
// Todo, the same as with the coord threads setting, this has no actual impact
return REDISMODULE_OK;
}

CONFIG_GETTER(getSearchIOThreads) {
SearchClusterConfig *realConfig = getOrCreateRealConfig((RSConfig *)config);
return sdsfromlonglong(realConfig->coordinatorIOThreads);
}

// search-io-threads
int set_search_io_threads(const char *name, long long val, void *privdata,
RedisModuleString **err) {
RSConfig *config = (RSConfig *)privdata;
SearchClusterConfig *realConfig = getOrCreateRealConfig(config);
realConfig->coordinatorIOThreads = (size_t)val;
return REDISMODULE_OK;
}

long long get_search_io_threads(const char *name, void *privdata) {
RSConfig *config = (RSConfig *)privdata;
SearchClusterConfig *realConfig = getOrCreateRealConfig(config);
return (long long)realConfig->coordinatorIOThreads;
}

// TOPOLOGY_VALIDATION_TIMEOUT
CONFIG_SETTER(setTopologyValidationTimeout) {
SearchClusterConfig *realConfig = getOrCreateRealConfig((RSConfig *)config);
Expand Down Expand Up @@ -223,6 +257,11 @@ static RSConfigOptions clusterOptions_g = {
.setValue = setSearchThreads,
.getValue = getSearchThreads,
.flags = RSCONFIGVAR_F_IMMUTABLE,},
{.name = "SEARCH_IO_THREADS",
.helpText = "Sets the number of I/O threads in the coordinator",
.setValue = setSearchIOThreads,
.getValue = getSearchIOThreads,
.flags = RSCONFIGVAR_F_IMMUTABLE},
{.name = "TOPOLOGY_VALIDATION_TIMEOUT",
.helpText = "Sets the timeout for topology validation (in milliseconds). After this timeout, "
"any pending requests will be processed, even if the topology is not fully connected. "
Expand Down Expand Up @@ -280,6 +319,15 @@ int RegisterClusterModuleConfig(RedisModuleCtx *ctx) {
)
)

RM_TRY(
RedisModule_RegisterNumericConfig(
ctx, "search-io-threads", COORDINATOR_IO_THREADS_DEFAULT_SIZE,
REDISMODULE_CONFIG_IMMUTABLE | REDISMODULE_CONFIG_UNPREFIXED, 1,
MAX_SEARCH_IO_THREADS, get_search_io_threads, set_search_io_threads, NULL,
(void*)&RSGlobalConfig
)
)

RM_TRY(
RedisModule_RegisterNumericConfig (
ctx, "search-topology-validation-timeout", DEFAULT_TOPOLOGY_VALIDATION_TIMEOUT,
Expand Down
3 changes: 3 additions & 0 deletions src/coord/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef struct {
size_t connPerShard;
size_t cursorReplyThreshold;
size_t coordinatorPoolSize; // number of threads in the coordinator thread pool
size_t coordinatorIOThreads; // number of I/O threads in the coordinator
size_t topologyValidationTimeoutMS;
} SearchClusterConfig;

Expand All @@ -33,6 +34,7 @@ extern RedisModuleString *config_dummy_password;
#define CLUSTER_TYPE_RLABS "redislabs"

#define COORDINATOR_POOL_DEFAULT_SIZE 20
#define COORDINATOR_IO_THREADS_DEFAULT_SIZE 1
#define DEFAULT_TOPOLOGY_VALIDATION_TIMEOUT 30000
#define DEFAULT_CURSOR_REPLY_THRESHOLD 1
#define DEFAULT_CONN_PER_SHARD 0
Expand All @@ -44,6 +46,7 @@ extern RedisModuleString *config_dummy_password;
.timeoutMS = 0, \
.cursorReplyThreshold = DEFAULT_CURSOR_REPLY_THRESHOLD, \
.coordinatorPoolSize = COORDINATOR_POOL_DEFAULT_SIZE, \
.coordinatorIOThreads = COORDINATOR_IO_THREADS_DEFAULT_SIZE, \
.topologyValidationTimeoutMS = DEFAULT_TOPOLOGY_VALIDATION_TIMEOUT, \
}

Expand Down
4 changes: 2 additions & 2 deletions src/coord/debug_commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* GNU Affero General Public License v3 (AGPLv3).
*/
#include "coord/rmr/rmr.h"
#include "coord/rmr/rq.h"
#include "coord/rmr/io_runtime_ctx.h"
#include "debug_commands.h"
#include "debug_command_names.h"
#include "coord/rmr/redis_cluster.h"
Expand Down Expand Up @@ -53,7 +53,7 @@ DEBUG_COMMAND(clearTopology) {
return RedisModule_ReplyWithError(ctx, NODEBUG_ERR);
}
if (argc != 2) return RedisModule_WrongArity(ctx);
RQ_Debug_ClearPendingTopo();
MR_Debug_ClearPendingTopo();
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}

Expand Down
Loading