From 2bb39897f6a227ef56cbccde5b71f890ba2be6b5 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Wed, 11 Oct 2023 15:33:33 +0600 Subject: [PATCH] Fix binary variants of XRANGE and XREAD commands (#3571) where each individual element in the list is actually a complex object rather than a simple byte array. --- .../redis/clients/jedis/CommandObjects.java | 24 +++++++++---------- src/main/java/redis/clients/jedis/Jedis.java | 12 +++++----- .../redis/clients/jedis/PipelineBase.java | 13 +++++----- .../redis/clients/jedis/TransactionBase.java | 12 +++++----- .../redis/clients/jedis/UnifiedJedis.java | 12 +++++----- .../jedis/commands/StreamBinaryCommands.java | 12 +++++----- .../jedis/commands/StreamCommands.java | 20 ++++++++-------- .../StreamPipelineBinaryCommands.java | 14 +++++------ 8 files changed, 60 insertions(+), 59 deletions(-) diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index d51d3a89ff..819c168c7b 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -2402,24 +2402,24 @@ public final CommandObject> xrevrange(String key, String end, return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end).add(start).add(COUNT).add(count), BuilderFactory.STREAM_ENTRY_LIST); } - public final CommandObject> xrange(byte[] key, byte[] start, byte[] end) { + public final CommandObject> xrange(byte[] key, byte[] start, byte[] end) { return new CommandObject<>(commandArguments(XRANGE).key(key).add(start == null ? "-" : start).add(end == null ? "+" : end), - BuilderFactory.BINARY_LIST); + BuilderFactory.RAW_OBJECT_LIST); } - public final CommandObject> xrange(byte[] key, byte[] start, byte[] end, int count) { + public final CommandObject> xrange(byte[] key, byte[] start, byte[] end, int count) { return new CommandObject<>(commandArguments(XRANGE).key(key).add(start == null ? "-" : start).add(end == null ? "+" : end) - .add(COUNT).add(count), BuilderFactory.BINARY_LIST); + .add(COUNT).add(count), BuilderFactory.RAW_OBJECT_LIST); } - public final CommandObject> xrevrange(byte[] key, byte[] end, byte[] start) { + public final CommandObject> xrevrange(byte[] key, byte[] end, byte[] start) { return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end == null ? "+" : end).add(start == null ? "-" : start), - BuilderFactory.BINARY_LIST); + BuilderFactory.RAW_OBJECT_LIST); } - public final CommandObject> xrevrange(byte[] key, byte[] end, byte[] start, int count) { + public final CommandObject> xrevrange(byte[] key, byte[] end, byte[] start, int count) { return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end == null ? "+" : end).add(start == null ? "-" : start) - .add(COUNT).add(count), BuilderFactory.BINARY_LIST); + .add(COUNT).add(count), BuilderFactory.RAW_OBJECT_LIST); } public final CommandObject xack(String key, String group, StreamEntryID... ids) { @@ -2660,7 +2660,7 @@ public final CommandObject>>> xreadGrou return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE); } - public final CommandObject> xread(XReadParams xReadParams, Map.Entry... streams) { + public final CommandObject> xread(XReadParams xReadParams, Map.Entry... streams) { CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS); for (Map.Entry entry : streams) { args.key(entry.getKey()); @@ -2668,10 +2668,10 @@ public final CommandObject> xread(XReadParams xReadParams, Map.Entr for (Map.Entry entry : streams) { args.add(entry.getValue()); } - return new CommandObject<>(args, BuilderFactory.BINARY_LIST); + return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST); } - public final CommandObject> xreadGroup(byte[] groupName, byte[] consumer, + public final CommandObject> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { CommandArguments args = commandArguments(XREADGROUP) .add(GROUP).add(groupName).add(consumer) @@ -2682,7 +2682,7 @@ public final CommandObject> xreadGroup(byte[] groupName, byte[] con for (Map.Entry entry : streams) { args.add(entry.getValue()); } - return new CommandObject<>(args, BuilderFactory.BINARY_LIST); + return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST); } // Stream commands diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index c024894caa..560b05d46e 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -4646,13 +4646,13 @@ public long hstrlen(final byte[] key, final byte[] field) { } @Override - public List xread(XReadParams xReadParams, Entry... streams) { + public List xread(XReadParams xReadParams, Entry... streams) { checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.xread(xReadParams, streams)); } @Override - public List xreadGroup(byte[] groupName, byte[] consumer, + public List xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Entry... streams) { checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); @@ -4671,25 +4671,25 @@ public long xlen(byte[] key) { } @Override - public List xrange(byte[] key, byte[] start, byte[] end) { + public List xrange(byte[] key, byte[] start, byte[] end) { checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.xrange(key, start, end)); } @Override - public List xrange(byte[] key, byte[] start, byte[] end, int count) { + public List xrange(byte[] key, byte[] start, byte[] end, int count) { checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.xrange(key, start, end, count)); } @Override - public List xrevrange(byte[] key, byte[] end, byte[] start) { + public List xrevrange(byte[] key, byte[] end, byte[] start) { checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.xrevrange(key, end, start)); } @Override - public List xrevrange(byte[] key, byte[] end, byte[] start, int count) { + public List xrevrange(byte[] key, byte[] end, byte[] start, int count) { checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.xrevrange(key, end, start, count)); } diff --git a/src/main/java/redis/clients/jedis/PipelineBase.java b/src/main/java/redis/clients/jedis/PipelineBase.java index 48c5a62682..aa2b33e9e7 100644 --- a/src/main/java/redis/clients/jedis/PipelineBase.java +++ b/src/main/java/redis/clients/jedis/PipelineBase.java @@ -2968,22 +2968,22 @@ public Response xlen(byte[] key) { } @Override - public Response> xrange(byte[] key, byte[] start, byte[] end) { + public Response> xrange(byte[] key, byte[] start, byte[] end) { return appendCommand(commandObjects.xrange(key, start, end)); } @Override - public Response> xrange(byte[] key, byte[] start, byte[] end, int count) { + public Response> xrange(byte[] key, byte[] start, byte[] end, int count) { return appendCommand(commandObjects.xrange(key, start, end, count)); } @Override - public Response> xrevrange(byte[] key, byte[] end, byte[] start) { + public Response> xrevrange(byte[] key, byte[] end, byte[] start) { return appendCommand(commandObjects.xrevrange(key, end, start)); } @Override - public Response> xrevrange(byte[] key, byte[] end, byte[] start, int count) { + public Response> xrevrange(byte[] key, byte[] end, byte[] start, int count) { return appendCommand(commandObjects.xrevrange(key, end, start, count)); } @@ -3088,12 +3088,13 @@ public Response> xinfoConsumers(byte[] key, byte[] group) { } @Override - public Response> xread(XReadParams xReadParams, Map.Entry... streams) { + public Response> xread(XReadParams xReadParams, Map.Entry... streams) { return appendCommand(commandObjects.xread(xReadParams, streams)); } @Override - public Response> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { + public Response> xreadGroup(byte[] groupName, byte[] consumer, + XReadGroupParams xReadGroupParams, Map.Entry... streams) { return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } diff --git a/src/main/java/redis/clients/jedis/TransactionBase.java b/src/main/java/redis/clients/jedis/TransactionBase.java index 1769838673..a806dd0f92 100644 --- a/src/main/java/redis/clients/jedis/TransactionBase.java +++ b/src/main/java/redis/clients/jedis/TransactionBase.java @@ -3135,22 +3135,22 @@ public Response xlen(byte[] key) { } @Override - public Response> xrange(byte[] key, byte[] start, byte[] end) { + public Response> xrange(byte[] key, byte[] start, byte[] end) { return appendCommand(commandObjects.xrange(key, start, end)); } @Override - public Response> xrange(byte[] key, byte[] start, byte[] end, int count) { + public Response> xrange(byte[] key, byte[] start, byte[] end, int count) { return appendCommand(commandObjects.xrange(key, start, end, count)); } @Override - public Response> xrevrange(byte[] key, byte[] end, byte[] start) { + public Response> xrevrange(byte[] key, byte[] end, byte[] start) { return appendCommand(commandObjects.xrevrange(key, end, start)); } @Override - public Response> xrevrange(byte[] key, byte[] end, byte[] start, int count) { + public Response> xrevrange(byte[] key, byte[] end, byte[] start, int count) { return appendCommand(commandObjects.xrevrange(key, end, start, count)); } @@ -3255,12 +3255,12 @@ public Response> xinfoConsumers(byte[] key, byte[] group) { } @Override - public Response> xread(XReadParams xReadParams, Map.Entry... streams) { + public Response> xread(XReadParams xReadParams, Map.Entry... streams) { return appendCommand(commandObjects.xread(xReadParams, streams)); } @Override - public Response> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { + public Response> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index 47e0f34374..2184534ca6 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -3058,22 +3058,22 @@ public long xlen(byte[] key) { } @Override - public List xrange(byte[] key, byte[] start, byte[] end) { + public List xrange(byte[] key, byte[] start, byte[] end) { return executeCommand(commandObjects.xrange(key, start, end)); } @Override - public List xrange(byte[] key, byte[] start, byte[] end, int count) { + public List xrange(byte[] key, byte[] start, byte[] end, int count) { return executeCommand(commandObjects.xrange(key, start, end, count)); } @Override - public List xrevrange(byte[] key, byte[] end, byte[] start) { + public List xrevrange(byte[] key, byte[] end, byte[] start) { return executeCommand(commandObjects.xrevrange(key, end, start)); } @Override - public List xrevrange(byte[] key, byte[] end, byte[] start, int count) { + public List xrevrange(byte[] key, byte[] end, byte[] start, int count) { return executeCommand(commandObjects.xrevrange(key, end, start, count)); } @@ -3178,12 +3178,12 @@ public List xinfoConsumers(byte[] key, byte[] group) { } @Override - public List xread(XReadParams xReadParams, Map.Entry... streams) { + public List xread(XReadParams xReadParams, Map.Entry... streams) { return executeCommand(commandObjects.xread(xReadParams, streams)); } @Override - public List xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { + public List xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams) { return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } // Stream commands diff --git a/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java b/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java index 0b43ebbcc3..01dec14d2d 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java @@ -15,13 +15,13 @@ default byte[] xadd(byte[] key, Map hash, XAddParams params) { long xlen(byte[] key); - List xrange(byte[] key, byte[] start, byte[] end); + List xrange(byte[] key, byte[] start, byte[] end); - List xrange(byte[] key, byte[] start, byte[] end, int count); + List xrange(byte[] key, byte[] start, byte[] end, int count); - List xrevrange(byte[] key, byte[] end, byte[] start); + List xrevrange(byte[] key, byte[] end, byte[] start); - List xrevrange(byte[] key, byte[] end, byte[] start, int count); + List xrevrange(byte[] key, byte[] end, byte[] start, int count); long xack(byte[] key, byte[] group, byte[]... ids); @@ -74,9 +74,9 @@ List xautoclaimJustId(byte[] key, byte[] groupName, byte[] consumerName, List xinfoConsumers(byte[] key, byte[] group); - List xread(XReadParams xReadParams, Map.Entry... streams); + List xread(XReadParams xReadParams, Map.Entry... streams); - List xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, + List xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry... streams); } diff --git a/src/main/java/redis/clients/jedis/commands/StreamCommands.java b/src/main/java/redis/clients/jedis/commands/StreamCommands.java index 6f1245b136..9c34cbc6a6 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamCommands.java @@ -123,16 +123,6 @@ default StreamEntryID xadd(String key, Map hash, XAddParams para */ long xgroupDelConsumer(String key, String groupName, String consumerName); - /** - * XPENDING key group - */ - StreamPendingSummary xpending(String key, String groupName); - - /** - * XPENDING key group [[IDLE min-idle-time] start end count [consumer]] - */ - List xpending(String key, String groupName, XPendingParams params); - /** * XDEL key ID [ID ...] */ @@ -148,6 +138,16 @@ default StreamEntryID xadd(String key, Map hash, XAddParams para */ long xtrim(String key, XTrimParams params); + /** + * XPENDING key group + */ + StreamPendingSummary xpending(String key, String groupName); + + /** + * XPENDING key group [[IDLE min-idle-time] start end count [consumer]] + */ + List xpending(String key, String groupName, XPendingParams params); + /** * {@code XCLAIM key group consumer min-idle-time ... * [IDLE ] [TIME ] [RETRYCOUNT ] diff --git a/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java b/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java index 8acb88a049..8198443009 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java @@ -16,13 +16,13 @@ default Response xadd(byte[] key, Map hash, XAddParams p Response xlen(byte[] key); - Response> xrange(byte[] key, byte[] start, byte[] end); + Response> xrange(byte[] key, byte[] start, byte[] end); - Response> xrange(byte[] key, byte[] start, byte[] end, int count); + Response> xrange(byte[] key, byte[] start, byte[] end, int count); - Response> xrevrange(byte[] key, byte[] end, byte[] start); + Response> xrevrange(byte[] key, byte[] end, byte[] start); - Response> xrevrange(byte[] key, byte[] end, byte[] start, int count); + Response> xrevrange(byte[] key, byte[] end, byte[] start, int count); Response xack(byte[] key, byte[] group, byte[]... ids); @@ -75,9 +75,9 @@ Response> xautoclaimJustId(byte[] key, byte[] groupName, byte[] con Response> xinfoConsumers(byte[] key, byte[] group); - Response> xread(XReadParams xReadParams, Map.Entry... streams); + Response> xread(XReadParams xReadParams, Map.Entry... streams); - Response> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, - Map.Entry... streams); + Response> xreadGroup(byte[] groupName, byte[] consumer, + XReadGroupParams xReadGroupParams, Map.Entry... streams); }