Skip to content

Commit

Permalink
Add support for SPUBLISH (#2838)
Browse files Browse the repository at this point in the history
* implementation of SPUBLISH

* sort methods by name

* test cluster spublish with no redirects

* use injected cluster client in RedisClusterPubSubConnectionIntegrationTests
  • Loading branch information
atakavci committed May 2, 2024
1 parent 21514ef commit dfbdddc
Show file tree
Hide file tree
Showing 27 changed files with 326 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ public RedisFuture<StreamScanCursor> hscan(KeyValueStreamingChannel<K, V> channe

@Override
public RedisFuture<StreamScanCursor> hscanNovalues(KeyStreamingChannel<K> channel, K key, ScanCursor scanCursor,
ScanArgs scanArgs) {
ScanArgs scanArgs) {
return dispatch(commandBuilder.hscanNoValuesStreaming(channel, key, scanCursor, scanArgs));
}

Expand Down Expand Up @@ -2069,6 +2069,11 @@ public RedisFuture<Set<V>> spop(K key, long count) {
return dispatch(commandBuilder.spop(key, count));
}

@Override
public RedisFuture<Long> spublish(K shardChannel, V message) {
return dispatch(commandBuilder.spublish(shardChannel, message));
}

@Override
public RedisFuture<V> srandmember(K key) {
return dispatch(commandBuilder.srandmember(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2147,6 +2147,11 @@ public Flux<V> spop(K key, long count) {
return createDissolvingFlux(() -> commandBuilder.spop(key, count));
}

@Override
public Mono<Long> spublish(K shardChannel, V message) {
return createMono(() -> commandBuilder.spublish(shardChannel, message));
}

@Override
public Mono<V> srandmember(K key) {
return createMono(() -> commandBuilder.srandmember(key));
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2754,6 +2754,13 @@ Command<K, V, Set<V>> spop(K key, long count) {
return createCommand(SPOP, new ValueSetOutput<>(codec), args);
}

Command<K, V, Long> spublish(K shardChannel, V message) {
LettuceAssert.notNull(shardChannel, "ShardChannel " + MUST_NOT_BE_NULL);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message);
return createCommand(SPUBLISH, new IntegerOutput<>(codec), args);
}

Command<K, V, V> srandmember(K key) {
notNullKey(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public interface BaseRedisAsyncCommands<K, V> {
*/
RedisFuture<Long> pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
RedisFuture<Long> spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public interface BaseRedisReactiveCommands<K, V> {
*/
Mono<Long> pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Mono<Long> spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ public interface BaseRedisCommands<K, V> {
*/
Long pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Long spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public void punsubscribed(K pattern, long count) {
notifications.punsubscribed(getNode(), pattern, count);
}

@Override
public void smessage(K shardChannel, V message) {
notifications.smessage(getNode(), shardChannel, message);
}

@Override
public void ssubscribed(K channel, long count) {
notifications.ssubscribed(getNode(), channel, count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ protected void notifyListeners(PubSubMessage<K, V> output) {
case unsubscribe:
multicast.unsubscribed(clusterNode, output.channel(), output.count());
break;
case smessage:
multicast.smessage(clusterNode, output.channel(), output.body());
break;
case ssubscribe:
multicast.ssubscribed(clusterNode, output.channel(), output.count());
break;
Expand Down Expand Up @@ -192,6 +195,12 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) {
clusterListeners.forEach(listener -> listener.punsubscribed(node, pattern, count));
}

@Override
public void smessage(RedisClusterNode node, K shardChannel, V message) {
getListeners().forEach(listener -> listener.smessage(shardChannel, message));
clusterListeners.forEach(listener -> listener.smessage(node, shardChannel, message));
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
getListeners().forEach(listener -> listener.ssubscribed(channel, count));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public interface BaseNodeSelectionAsyncCommands<K, V> {
*/
AsyncExecutions<Long> pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
AsyncExecutions<Long> spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ public interface BaseNodeSelectionCommands<K, V> {
*/
Executions<Long> pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Executions<Long> spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) {
// empty adapter method
}

@Override
public void smessage(RedisClusterNode node, K shardChannel, V message) {
// empty adapter method
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
// empty adapter method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ public interface RedisClusterPubSubListener<K, V> {
*/
void punsubscribed(RedisClusterNode node, K pattern, long count);

/**
* Message received from a shard channel subscription.
*
* @param node the {@link RedisClusterNode} from which the {@code message} originates.
* @param shardChannel shard channel.
* @param message Message.
* @since 7.0
*/
default void smessage(RedisClusterNode node, K shardChannel, V message){
message(node, shardChannel, message);
}

/**
* Subscribed to a shard channel.
*
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandType.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword {

// Pub/Sub

PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE,
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE, SPUBLISH,

// Sets

Expand Down
21 changes: 13 additions & 8 deletions src/main/java/io/lettuce/core/pubsub/PubSubCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ final Command<K, V, V> punsubscribe(K... patterns) {
return pubSubCommand(PUNSUBSCRIBE, new PubSubOutput<>(codec), patterns);
}

Command<K, V, Long> spublish(K shardChannel, V message) {
CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(shardChannel).addValue(message);
return createCommand(SPUBLISH, new IntegerOutput<>(codec), args);
}

@SafeVarargs
final Command<K, V, V> ssubscribe(K... shardChannels) {
LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKeys(shardChannels);
return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args);
}

@SafeVarargs
final Command<K, V, V> subscribe(K... channels) {
LettuceAssert.notEmpty(channels, "Channels " + MUST_NOT_BE_EMPTY);
Expand All @@ -110,14 +123,6 @@ final Command<K, V, V> unsubscribe(K... channels) {
return pubSubCommand(UNSUBSCRIBE, new PubSubOutput<>(codec), channels);
}

@SafeVarargs
final Command<K, V, V> ssubscribe(K... shardChannels) {
LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKeys(shardChannels);
return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args);
}

@SafeVarargs
final <T> Command<K, V, T> pubSubCommand(CommandType type, CommandOutput<K, V, T> output, K... keys) {
return new Command<>(type, output, new PubSubCommandArgs<>(codec).addKeys(keys));
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ protected void notifyListeners(PubSubMessage<K, V> message) {
case unsubscribe:
listener.unsubscribed(message.channel(), message.count());
break;
case smessage:
listener.smessage(message.channel(), message.body());
break;
case ssubscribe:
listener.ssubscribed(message.channel(), message.count());
break;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/lettuce/core/pubsub/PubSubOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PubSubOutput<K, V> extends CommandOutput<K, V, V> implements PubSub

public enum Type {

message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe;
message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe, smessage;

private final static Set<String> names = new HashSet<>();

Expand Down Expand Up @@ -108,6 +108,7 @@ private void handleOutput(ByteBuffer bytes) {
pattern = codec.decodeKey(bytes);
break;
}
case smessage:
case message:
if (channel == null) {
channel = codec.decodeKey(bytes);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public void punsubscribed(K pattern, long count) {
// empty adapter method
}

@Override
public void smessage(K shardChannel, V message) {
// empty adapter method
}

@Override
public void ssubscribed(K shardChannel, long count) {
// empty adapter method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public RedisFuture<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return dispatch(commandBuilder.pubsubShardNumsub(shardChannels));
}

@Override
public RedisFuture<Long> spublish(K shardChannel, V message) {
return dispatch(commandBuilder.spublish(shardChannel, message));
}

@Override
@SuppressWarnings("unchecked")
public RedisFuture<Void> ssubscribe(K... channels) {
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,15 @@ default void ssubscribed(K shardChannel, long count) {
subscribed(shardChannel, count);
}

/**
* Message received from a shard channel subscription.
*
* @param shardChannel shard channel.
* @param message Message.
* @since 7.0
*/
default void smessage(K shardChannel, V message) {
message(shardChannel, message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public Mono<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels));
}

@Override
public Mono<Long> spublish(K shardChannel, V message) {
return createMono(() -> commandBuilder.publish(shardChannel, message));
}

@Override
public Mono<Void> ssubscribe(K... shardChannels) {
return createFlux(() -> commandBuilder.ssubscribe(shardChannels)).then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ interface BaseRedisCoroutinesCommands<K : Any, V : Any> {
*/
suspend fun pubsubNumpat(): Long

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
suspend fun spublish(shardChannel: K, message: V): Long?

/**
* Echo the given string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ internal class BaseRedisCoroutinesCommandsImpl<K : Any, V : Any>(internal val op

override suspend fun pubsubNumpat(): Long = ops.pubsubNumpat().awaitSingle()

override suspend fun spublish(shardChannel: K, message: V): Long? = ops.spublish(shardChannel, message).awaitFirstOrNull()

override suspend fun echo(msg: V): V = ops.echo(msg).awaitSingle()

override suspend fun role(): List<Any> = ops.role().asFlow().toList()
Expand Down
10 changes: 10 additions & 0 deletions src/main/templates/io/lettuce/core/api/BaseRedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public interface BaseRedisCommands<K, V> {
*/
Long pubsubNumpat();

/**
* Post a message to a shard channel.
*
* @param shardChannel the shard channel type: key.
* @param message the message type: value.
* @return Long integer-reply the number of clients that received the message.
* @since 7.0
*/
Long spublish(K shardChannel, V message);

/**
* Echo the given string.
*
Expand Down
Loading

0 comments on commit dfbdddc

Please sign in to comment.