Skip to content

Commit

Permalink
Fix binary variants of XRANGE and XREAD commands (#3571)
Browse files Browse the repository at this point in the history
where each individual element in the list is actually a complex object rather than a simple byte array.
  • Loading branch information
sazzad16 authored Oct 11, 2023
1 parent 5cd40fc commit 2bb3989
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 59 deletions.
24 changes: 12 additions & 12 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -2402,24 +2402,24 @@ public final CommandObject<List<StreamEntry>> xrevrange(String key, String end,
return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end).add(start).add(COUNT).add(count), BuilderFactory.STREAM_ENTRY_LIST);
}

public final CommandObject<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end) {
public final CommandObject<List<Object>> xrange(byte[] key, byte[] start, byte[] end) {
return new CommandObject<>(commandArguments(XRANGE).key(key).add(start == null ? "-" : start).add(end == null ? "+" : end),
BuilderFactory.BINARY_LIST);
BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count) {
public final CommandObject<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count) {
return new CommandObject<>(commandArguments(XRANGE).key(key).add(start == null ? "-" : start).add(end == null ? "+" : end)
.add(COUNT).add(count), BuilderFactory.BINARY_LIST);
.add(COUNT).add(count), BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start) {
public final CommandObject<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start) {
return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end == null ? "+" : end).add(start == null ? "-" : start),
BuilderFactory.BINARY_LIST);
BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public final CommandObject<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return new CommandObject<>(commandArguments(XREVRANGE).key(key).add(end == null ? "+" : end).add(start == null ? "-" : start)
.add(COUNT).add(count), BuilderFactory.BINARY_LIST);
.add(COUNT).add(count), BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<Long> xack(String key, String group, StreamEntryID... ids) {
Expand Down Expand Up @@ -2660,18 +2660,18 @@ public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xreadGrou
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
}

public final CommandObject<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
for (Map.Entry<byte[], byte[]> entry : streams) {
args.key(entry.getKey());
}
for (Map.Entry<byte[], byte[]> entry : streams) {
args.add(entry.getValue());
}
return new CommandObject<>(args, BuilderFactory.BINARY_LIST);
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
}

public final CommandObject<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer,
public final CommandObject<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREADGROUP)
.add(GROUP).add(groupName).add(consumer)
Expand All @@ -2682,7 +2682,7 @@ public final CommandObject<List<byte[]>> xreadGroup(byte[] groupName, byte[] con
for (Map.Entry<byte[], byte[]> entry : streams) {
args.add(entry.getValue());
}
return new CommandObject<>(args, BuilderFactory.BINARY_LIST);
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
}
// Stream commands

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4646,13 +4646,13 @@ public long hstrlen(final byte[] key, final byte[] field) {
}

@Override
public List<byte[]> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
public List<Object> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public List<byte[]> xreadGroup(byte[] groupName, byte[] consumer,
public List<Object> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
Expand All @@ -4671,25 +4671,25 @@ public long xlen(byte[] key) {
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrange(key, start, end));
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end, int count) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xrevrange(key, end, start, count));
}
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/redis/clients/jedis/PipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2968,22 +2968,22 @@ public Response<Long> xlen(byte[] key) {
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end) {
return appendCommand(commandObjects.xrange(key, start, end));
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count) {
return appendCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start) {
return appendCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return appendCommand(commandObjects.xrevrange(key, end, start, count));
}

Expand Down Expand Up @@ -3088,12 +3088,13 @@ public Response<List<Object>> xinfoConsumers(byte[] key, byte[] group) {
}

@Override
public Response<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Response<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3135,22 +3135,22 @@ public Response<Long> xlen(byte[] key) {
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end) {
return appendCommand(commandObjects.xrange(key, start, end));
}

@Override
public Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count) {
public Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count) {
return appendCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start) {
return appendCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return appendCommand(commandObjects.xrevrange(key, end, start, count));
}

Expand Down Expand Up @@ -3255,12 +3255,12 @@ public Response<List<Object>> xinfoConsumers(byte[] key, byte[] group) {
}

@Override
public Response<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Response<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
public Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3058,22 +3058,22 @@ public long xlen(byte[] key) {
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end) {
return executeCommand(commandObjects.xrange(key, start, end));
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count) {
public List<Object> xrange(byte[] key, byte[] start, byte[] end, int count) {
return executeCommand(commandObjects.xrange(key, start, end, count));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start) {
return executeCommand(commandObjects.xrevrange(key, end, start));
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
public List<Object> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
return executeCommand(commandObjects.xrevrange(key, end, start, count));
}

Expand Down Expand Up @@ -3178,12 +3178,12 @@ public List<Object> xinfoConsumers(byte[] key, byte[] group) {
}

@Override
public List<byte[]> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
public List<Object> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
return executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
public List<Object> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}
// Stream commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ default byte[] xadd(byte[] key, Map<byte[], byte[]> hash, XAddParams params) {

long xlen(byte[] key);

List<byte[]> xrange(byte[] key, byte[] start, byte[] end);
List<Object> xrange(byte[] key, byte[] start, byte[] end);

List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count);
List<Object> xrange(byte[] key, byte[] start, byte[] end, int count);

List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start);
List<Object> xrevrange(byte[] key, byte[] end, byte[] start);

List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count);
List<Object> xrevrange(byte[] key, byte[] end, byte[] start, int count);

long xack(byte[] key, byte[] group, byte[]... ids);

Expand Down Expand Up @@ -74,9 +74,9 @@ List<Object> xautoclaimJustId(byte[] key, byte[] groupName, byte[] consumerName,

List<Object> xinfoConsumers(byte[] key, byte[] group);

List<byte[]> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);
List<Object> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);

List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
List<Object> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
Map.Entry<byte[], byte[]>... streams);

}
20 changes: 10 additions & 10 deletions src/main/java/redis/clients/jedis/commands/StreamCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,6 @@ default StreamEntryID xadd(String key, Map<String, String> hash, XAddParams para
*/
long xgroupDelConsumer(String key, String groupName, String consumerName);

/**
* XPENDING key group
*/
StreamPendingSummary xpending(String key, String groupName);

/**
* XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
*/
List<StreamPendingEntry> xpending(String key, String groupName, XPendingParams params);

/**
* XDEL key ID [ID ...]
*/
Expand All @@ -148,6 +138,16 @@ default StreamEntryID xadd(String key, Map<String, String> hash, XAddParams para
*/
long xtrim(String key, XTrimParams params);

/**
* XPENDING key group
*/
StreamPendingSummary xpending(String key, String groupName);

/**
* XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
*/
List<StreamPendingEntry> xpending(String key, String groupName, XPendingParams params);

/**
* {@code XCLAIM key group consumer min-idle-time <ID-1> ... <ID-N>
* [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ default Response<byte[]> xadd(byte[] key, Map<byte[], byte[]> hash, XAddParams p

Response<Long> xlen(byte[] key);

Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end);
Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end);

Response<List<byte[]>> xrange(byte[] key, byte[] start, byte[] end, int count);
Response<List<Object>> xrange(byte[] key, byte[] start, byte[] end, int count);

Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start);
Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start);

Response<List<byte[]>> xrevrange(byte[] key, byte[] end, byte[] start, int count);
Response<List<Object>> xrevrange(byte[] key, byte[] end, byte[] start, int count);

Response<Long> xack(byte[] key, byte[] group, byte[]... ids);

Expand Down Expand Up @@ -75,9 +75,9 @@ Response<List<Object>> xautoclaimJustId(byte[] key, byte[] groupName, byte[] con

Response<List<Object>> xinfoConsumers(byte[] key, byte[] group);

Response<List<byte[]>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);
Response<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams);

Response<List<byte[]>> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
Map.Entry<byte[], byte[]>... streams);
Response<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams);

}

0 comments on commit 2bb3989

Please sign in to comment.