From 41dea069c745b0c47b8e152b0ab2a5aa65763b15 Mon Sep 17 00:00:00 2001 From: Harkrishn Patro Date: Mon, 8 Apr 2024 18:36:54 +0000 Subject: [PATCH] Propagate sharded pubsub data via replication link Signed-off-by: Harkrishn Patro --- src/commands.def | 2 +- src/commands/spublish.json | 3 ++- src/pubsub.c | 5 ++-- tests/unit/cluster/links.tcl | 7 +++--- tests/unit/cluster/sharded-pubsub.tcl | 34 ++++++++++++--------------- 5 files changed, 24 insertions(+), 27 deletions(-) diff --git a/src/commands.def b/src/commands.def index 7007568b61..8d3543d7c4 100644 --- a/src/commands.def +++ b/src/commands.def @@ -10768,7 +10768,7 @@ struct COMMAND_STRUCT serverCommandTable[] = { {MAKE_CMD("publish","Posts a message to a channel.","O(N+M) where N is the number of clients subscribed to the receiving channel and M is the total number of subscribed patterns (by any client).","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUBLISH_History,0,PUBLISH_Tips,0,publishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE|CMD_SENTINEL,0,PUBLISH_Keyspecs,0,NULL,2),.args=PUBLISH_Args}, {MAKE_CMD("pubsub","A container for Pub/Sub commands.","Depends on subcommand.","2.8.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUBSUB_History,0,PUBSUB_Tips,0,NULL,-2,0,0,PUBSUB_Keyspecs,0,NULL,0),.subcommands=PUBSUB_Subcommands}, {MAKE_CMD("punsubscribe","Stops listening to messages published to channels that match one or more patterns.","O(N) where N is the number of patterns to unsubscribe.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUNSUBSCRIBE_History,0,PUNSUBSCRIBE_Tips,0,punsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,PUNSUBSCRIBE_Keyspecs,0,NULL,1),.args=PUNSUBSCRIBE_Args}, -{MAKE_CMD("spublish","Post a message to a shard channel","O(N) where N is the number of clients subscribed to the receiving shard channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SPUBLISH_History,0,SPUBLISH_Tips,0,spublishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE,0,SPUBLISH_Keyspecs,1,NULL,2),.args=SPUBLISH_Args}, +{MAKE_CMD("spublish","Post a message to a shard channel","O(N) where N is the number of clients subscribed to the receiving shard channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SPUBLISH_History,0,SPUBLISH_Tips,0,spublishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE|CMD_WRITE,0,SPUBLISH_Keyspecs,1,NULL,2),.args=SPUBLISH_Args}, {MAKE_CMD("ssubscribe","Listens for messages published to shard channels.","O(N) where N is the number of shard channels to subscribe to.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SSUBSCRIBE_History,0,SSUBSCRIBE_Tips,0,ssubscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,SSUBSCRIBE_Keyspecs,1,NULL,1),.args=SSUBSCRIBE_Args}, {MAKE_CMD("subscribe","Listens for messages published to channels.","O(N) where N is the number of channels to subscribe to.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SUBSCRIBE_History,0,SUBSCRIBE_Tips,0,subscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,SUBSCRIBE_Keyspecs,0,NULL,1),.args=SUBSCRIBE_Args}, {MAKE_CMD("sunsubscribe","Stops listening to messages posted to shard channels.","O(N) where N is the number of shard channels to unsubscribe.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SUNSUBSCRIBE_History,0,SUNSUBSCRIBE_Tips,0,sunsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,SUNSUBSCRIBE_Keyspecs,1,NULL,1),.args=SUNSUBSCRIBE_Args}, diff --git a/src/commands/spublish.json b/src/commands/spublish.json index da1360de7c..cab88545b5 100644 --- a/src/commands/spublish.json +++ b/src/commands/spublish.json @@ -11,7 +11,8 @@ "LOADING", "STALE", "FAST", - "MAY_REPLICATE" + "MAY_REPLICATE", + "WRITE" ], "arguments": [ { diff --git a/src/pubsub.c b/src/pubsub.c index b6db719b66..4e971adb97 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -717,9 +717,8 @@ void channelList(client *c, sds pat, kvstore *pubsub_channels) { /* SPUBLISH */ void spublishCommand(client *c) { - int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1); - if (!server.cluster_enabled) - forceCommandPropagation(c,PROPAGATE_REPL); + int receivers = pubsubPublishMessage(c->argv[1], c->argv[2], 1); + forceCommandPropagation(c, PROPAGATE_REPL); addReplyLongLong(c,receivers); } diff --git a/tests/unit/cluster/links.tcl b/tests/unit/cluster/links.tcl index a202c378bd..8b595ee659 100644 --- a/tests/unit/cluster/links.tcl +++ b/tests/unit/cluster/links.tcl @@ -74,7 +74,7 @@ start_cluster 1 2 {tags {external:skip cluster}} { set primary [Rn $primary_id] set replica1 [Rn $replica1_id] - test "Broadcast message across a cluster shard while a cluster link is down" { + test "Broadcast message on a primary across a cluster shard while a cluster link is down" { set replica1_node_id [$replica1 CLUSTER MYID] set channelname ch3 @@ -94,7 +94,7 @@ start_cluster 1 2 {tags {external:skip cluster}} { # Verify number of links with cluster stable state assert_equal [expr [number_of_peers $primary_id]*2] [number_of_links $primary_id] - # Disconnect the cluster between primary and replica1 and publish a message. + # Disconnect the cluster link from primary to replica1 and publish a message. $primary MULTI $primary DEBUG CLUSTERLINK KILL TO $replica1_node_id $primary SPUBLISH $channelname hello @@ -113,7 +113,8 @@ start_cluster 1 2 {tags {external:skip cluster}} { # Publish a message afterwards. $primary SPUBLISH $channelname world - # Verify replica1 has received only (world) / hello is lost. + # Verify replica1 has received both (hello/world), irrespective of the cluster link health. + assert_equal "smessage ch3 hello" [$subscribeclient1 read] assert_equal "smessage ch3 world" [$subscribeclient1 read] # Verify replica2 has received both messages (hello/world) diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl index b5b19ff481..06e17533c0 100644 --- a/tests/unit/cluster/sharded-pubsub.tcl +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -5,13 +5,19 @@ start_cluster 1 1 {tags {external:skip cluster}} { set primary [Rn $primary_id] set replica [Rn $replica1_id] + test "Sharded pubsub publish behavior on a primary" { + assert_equal 0 [$primary spublish ch1 "hello"] + } + + test "Sharded pubsub publish behavior on a replica" { + assert_error "*MOVED*" {$replica spublish ch1 "hello"} + } + + test "Sharded pubsub publish behavior within multi/exec" { - foreach {node} {primary replica} { - set node [set $node] - $node MULTI - $node SPUBLISH ch1 "hello" - $node EXEC - } + $primary MULTI + $primary SPUBLISH ch1 "hello" + $primary EXEC } test "Sharded pubsub within multi/exec with cross slot operation" { @@ -29,10 +35,9 @@ start_cluster 1 1 {tags {external:skip cluster}} { $primary EXEC } {0 {}} - test "Sharded pubsub publish behavior within multi/exec with read operation on replica" { + test "Sharded pubsub publish behavior within multi/exec on replica" { $replica MULTI - $replica SPUBLISH foo "hello" - catch {[$replica GET foo]} err + catch {[$replica SPUBLISH foo "hello"]} err assert_match {MOVED*} $err catch {[$replica EXEC]} err assert_match {EXECABORT*} $err @@ -44,13 +49,4 @@ start_cluster 1 1 {tags {external:skip cluster}} { $primary SET foo bar $primary EXEC } {0 OK} - - test "Sharded pubsub publish behavior within multi/exec with write operation on replica" { - $replica MULTI - $replica SPUBLISH foo "hello" - catch {[$replica SET foo bar]} err - assert_match {MOVED*} $err - catch {[$replica EXEC]} err - assert_match {EXECABORT*} $err - } -} \ No newline at end of file +}