diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 0a4f1affa2..d2fa537036 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -218,6 +218,8 @@ static struct config { int shutdown; int monitor_mode; int pubsub_mode; + int pubsub_unsharded_count; /* channels and patterns */ + int pubsub_sharded_count; /* shard channels */ int blocking_state_aborted; /* used to abort monitor_mode and pubsub_mode. */ int latency_mode; int latency_dist_mode; @@ -2229,6 +2231,28 @@ static int cliReadReply(int output_raw_strings) { return REDIS_OK; } +/* Helper method to handle pubsub subscription/unsubscription. */ +static void handlePubSubMode(redisReply *reply) { + char *cmd = reply->element[0]->str; + int count = reply->element[2]->integer; + + /* Update counts based on the command type */ + if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { + config.pubsub_unsharded_count = count; + } else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { + config.pubsub_sharded_count = count; + } + + /* Update pubsub mode based on the current counts */ + if (config.pubsub_unsharded_count + config.pubsub_sharded_count == 0 && config.pubsub_mode) { + config.pubsub_mode = 0; + cliRefreshPrompt(); + } else if (config.pubsub_unsharded_count + config.pubsub_sharded_count > 0 && !config.pubsub_mode) { + config.pubsub_mode = 1; + cliRefreshPrompt(); + } +} + /* Simultaneously wait for pubsub messages from the server and input on stdin. */ static void cliWaitForMessagesOrStdin(void) { int show_info = config.output != OUTPUT_RAW && (isatty(STDOUT_FILENO) || getenv("FAKETTY")); @@ -2246,7 +2270,13 @@ static void cliWaitForMessagesOrStdin(void) { sds out = cliFormatReply(reply, config.output, 0); fwrite(out, sdslen(out), 1, stdout); fflush(stdout); + + if (isPubsubPush(reply)) { + handlePubSubMode(reply); + } + sdsfree(out); + freeReplyObject(reply); } } while (reply); @@ -2397,13 +2427,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) { fflush(stdout); if (config.pubsub_mode || num_expected_pubsub_push > 0) { if (isPubsubPush(config.last_reply)) { + handlePubSubMode(config.last_reply); + if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) { /* This pushed message confirms the * [p|s][un]subscribe command. */ - if (is_subscribe && !config.pubsub_mode) { - config.pubsub_mode = 1; - cliRefreshPrompt(); - } if (--num_expected_pubsub_push > 0) { continue; /* We need more of these. */ } @@ -3117,6 +3145,13 @@ void cliSetPreferences(char **argv, int argc, int interactive) { else { printf("%sunknown valkey-cli preference '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[1]); } + } else if (!strcasecmp(argv[0], ":get") && argc >= 2) { + if (!strcasecmp(argv[1], "pubsub")) { + printf("%d\n", config.pubsub_mode); + } else { + printf("%sunknown valkey-cli get option '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[1]); + } + fflush(stdout); } else { printf("%sunknown valkey-cli internal command '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[0]); } @@ -9495,6 +9530,8 @@ int main(int argc, char **argv) { config.shutdown = 0; config.monitor_mode = 0; config.pubsub_mode = 0; + config.pubsub_unsharded_count = 0; + config.pubsub_sharded_count = 0; config.blocking_state_aborted = 0; config.latency_mode = 0; config.latency_dist_mode = 0; diff --git a/tests/integration/valkey-cli.tcl b/tests/integration/valkey-cli.tcl index 0c15af74f9..a56818b8c2 100644 --- a/tests/integration/valkey-cli.tcl +++ b/tests/integration/valkey-cli.tcl @@ -608,6 +608,232 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS assert_equal "a\n1\nb\n2\nc\n3" [exec {*}$cmdline ZRANGE new_zset 0 -1 WITHSCORES] } + test "valkey-cli pubsub mode with single standard channel subscription" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SUBSCRIBE ch1" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "UNSUBSCRIBE ch1" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + + test "valkey-cli pubsub mode with multiple standard channel subscriptions" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SUBSCRIBE ch1 ch2 ch3" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "UNSUBSCRIBE" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + + test "valkey-cli pubsub mode with single shard channel subscription" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SSUBSCRIBE schannel1" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "SUNSUBSCRIBE schannel1" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + + test "valkey-cli pubsub mode with multiple shard channel subscriptions" { + + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SSUBSCRIBE schannel1 schannel2 schannel3" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "SUNSUBSCRIBE" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + + test "valkey-cli pubsub mode with single pattern channel subscription" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "PSUBSCRIBE pattern1*" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "PUNSUBSCRIBE pattern1*" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + + test "valkey-cli pubsub mode with multiple pattern channel subscriptions" { + set fd [open_cli] + + write_cli $fd "PSUBSCRIBE pattern1* pattern2* pattern3*" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "PUNSUBSCRIBE" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + + test "valkey-cli pubsub mode when subscribing to the same channel" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SUBSCRIBE ch1 ch1" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "UNSUBSCRIBE" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + + test "valkey-cli pubsub mode with multiple subscription types" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SUBSCRIBE ch1 ch2 ch3" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "PSUBSCRIBE pattern*" + set response [read_cli $fd] + set lines [split $response "\n"] + assert_equal "psubscribe" [lindex $lines 0] + assert_equal "pattern*" [lindex $lines 1] + assert_equal "4" [lindex $lines 2] + + write_cli $fd "SSUBSCRIBE schannel" + set response [read_cli $fd] + set lines [split $response "\n"] + assert_equal "ssubscribe" [lindex $lines 0] + assert_equal "schannel" [lindex $lines 1] + assert_equal "1" [lindex $lines 2] + + write_cli $fd "PUNSUBSCRIBE pattern*" + set response [read_cli $fd] + set lines [split $response "\n"] + assert_equal "punsubscribe" [lindex $lines 0] + assert_equal "pattern*" [lindex $lines 1] + assert_equal "3" [lindex $lines 2] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "SUNSUBSCRIBE schannel" + set response [read_cli $fd] + set lines [split $response "\n"] + assert_equal "sunsubscribe" [lindex $lines 0] + assert_equal "schannel" [lindex $lines 1] + assert_equal "0" [lindex $lines 2] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "UNSUBSCRIBE" + set response [read_cli $fd] + + # Verify pubsub mode is no longer active + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + test "Valid Connection Scheme: redis://" { set cmdline [valkeycliuri "redis://" [srv host] [srv port]] assert_equal {PONG} [exec {*}$cmdline PING]