From 8d58aa4dd22778e957965d2b2e56faf09e7f6e4a Mon Sep 17 00:00:00 2001 From: Nathan Lo Date: Mon, 30 Sep 2024 16:52:32 -0400 Subject: [PATCH 1/5] Fix XPENDING serialization issue --- src/commands/cmd_stream.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index f02d5b34d13..e2d0a3a9dbf 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -896,7 +896,7 @@ class CommandXPending : public Commander { static Status SendResults([[maybe_unused]] Connection *conn, std::string *output, StreamGetPendingEntryResult &results) { - output->append(redis::MultiLen(3 + results.consumer_infos.size())); + output->append(redis::MultiLen(4)); output->append(redis::Integer(results.pending_number)); output->append(redis::BulkString(results.first_entry_id.ToString())); output->append(redis::BulkString(results.last_entry_id.ToString())); From ec4a56f45c6cc9d4e08874407a39eebcb93cb85f Mon Sep 17 00:00:00 2001 From: Nathan Lo Date: Mon, 30 Sep 2024 17:50:33 -0400 Subject: [PATCH 2/5] Add test case --- tests/gocase/unit/type/stream/stream_test.go | 21 ++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index b50b230f6c0..515f30bf14c 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2127,6 +2127,27 @@ func TestStreamOffset(t *testing.T) { Higher: "2-2", Consumers: map[string]int64{"myconsumer": 2}, }, r1) + + // Add a second consumer and check that XPENDING still works + consumerName2 := "myconsumer2" + err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName2, + Streams: []string{streamName, ">"}, + Count: 1, + NoAck: false, + }).Err() + require.NoError(t, err) + + r1, err1 = rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err1) + + require.Equal(t, &redis.XPending{ + Count: 2, + Lower: "1-0", + Higher: "2-2", + Consumers: map[string]int64{"myconsumer": 2, "myconsumer2": 0}, + }, r1) }) } From bf377d28c84bd4a97aedc852370a8c9519ed689f Mon Sep 17 00:00:00 2001 From: Nathan Lo Date: Tue, 1 Oct 2024 13:26:02 -0400 Subject: [PATCH 3/5] Fix hanging test --- tests/gocase/unit/type/stream/stream_test.go | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 515f30bf14c..7a6c3172195 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2056,9 +2056,11 @@ func TestStreamOffset(t *testing.T) { t.Run("XPending with different kinds of commands", func(t *testing.T) { streamName := "mystream" groupName := "mygroup" + require.NoError(t, rdb.Del(ctx, streamName).Err()) r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result() require.NoError(t, err) + require.Equal(t, int64(0), r) require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ Stream: streamName, @@ -2066,6 +2068,7 @@ func TestStreamOffset(t *testing.T) { Values: []string{"field1", "data1"}, }).Err()) require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + consumerName := "myconsumer" err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: groupName, @@ -2083,7 +2086,7 @@ func TestStreamOffset(t *testing.T) { Count: 1, Lower: "1-0", Higher: "1-0", - Consumers: map[string]int64{"myconsumer": 1}, + Consumers: map[string]int64{consumerName: 1}, }, r1) require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ @@ -2113,7 +2116,7 @@ func TestStreamOffset(t *testing.T) { Count: 3, Lower: "1-0", Higher: "2-2", - Consumers: map[string]int64{"myconsumer": 3}, + Consumers: map[string]int64{consumerName: 3}, }, r1) require.NoError(t, rdb.XAck(ctx, streamName, groupName, "2-0").Err()) @@ -2125,10 +2128,16 @@ func TestStreamOffset(t *testing.T) { Count: 2, Lower: "1-0", Higher: "2-2", - Consumers: map[string]int64{"myconsumer": 2}, + Consumers: map[string]int64{consumerName: 2}, }, r1) // Add a second consumer and check that XPENDING still works + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "3-0", + Values: []string{"field1", "data1"}, + }).Err()) + consumerName2 := "myconsumer2" err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: groupName, @@ -2143,10 +2152,10 @@ func TestStreamOffset(t *testing.T) { require.NoError(t, err1) require.Equal(t, &redis.XPending{ - Count: 2, + Count: 3, Lower: "1-0", - Higher: "2-2", - Consumers: map[string]int64{"myconsumer": 2, "myconsumer2": 0}, + Higher: "3-0", + Consumers: map[string]int64{consumerName: 2, consumerName2: 1}, }, r1) }) } From 117c36fa7770cf14e83832729f2e9f6cab8b2147 Mon Sep 17 00:00:00 2001 From: Nathan Lo Date: Tue, 1 Oct 2024 16:56:04 -0400 Subject: [PATCH 4/5] Fix and test XPENDING when there are no pending messages --- src/commands/cmd_stream.cc | 30 +++++++++---- tests/gocase/unit/type/stream/stream_test.go | 44 +++++++++++++++++++- 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index e2d0a3a9dbf..c5f64099fea 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -896,15 +896,27 @@ class CommandXPending : public Commander { static Status SendResults([[maybe_unused]] Connection *conn, std::string *output, StreamGetPendingEntryResult &results) { - output->append(redis::MultiLen(4)); - output->append(redis::Integer(results.pending_number)); - output->append(redis::BulkString(results.first_entry_id.ToString())); - output->append(redis::BulkString(results.last_entry_id.ToString())); - output->append(redis::MultiLen(results.consumer_infos.size())); - for (const auto &entry : results.consumer_infos) { - output->append(redis::MultiLen(2)); - output->append(redis::BulkString(entry.first)); - output->append(redis::BulkString(std::to_string(entry.second))); + // NOTE: In the case that our stream has no pending elements, Redis will + // return NilString for the first and last entry IDs, and a nil array + // for the consumer infos. Make this a special case to maintain consistency + // with Redis. + if (results.pending_number == 0) { + output->append(redis::MultiLen(4)); + output->append(redis::Integer(0)); + output->append(conn->NilString()); + output->append(conn->NilString()); + output->append(conn->NilArray()); + } else { + output->append(redis::MultiLen(4)); + output->append(redis::Integer(results.pending_number)); + output->append(redis::BulkString(results.first_entry_id.ToString())); + output->append(redis::BulkString(results.last_entry_id.ToString())); + output->append(redis::MultiLen(results.consumer_infos.size())); + for (const auto &entry : results.consumer_infos) { + output->append(redis::MultiLen(2)); + output->append(redis::BulkString(entry.first)); + output->append(redis::BulkString(std::to_string(entry.second))); + } } return Status::OK(); diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 7a6c3172195..e31549f8258 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2060,8 +2060,8 @@ func TestStreamOffset(t *testing.T) { require.NoError(t, rdb.Del(ctx, streamName).Err()) r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result() require.NoError(t, err) - require.Equal(t, int64(0), r) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ Stream: streamName, ID: "1-0", @@ -2158,6 +2158,48 @@ func TestStreamOffset(t *testing.T) { Consumers: map[string]int64{consumerName: 2, consumerName2: 1}, }, r1) }) + + t.Run("XPENDING on a consumer group with no pending messages", func(t *testing.T) { + streamName := "stream" + groupName := "group" + consumerName := "consumer" + messageID := "1-0" + + // Remove any existing data + require.NoError(t, rdb.Del(ctx, streamName).Err()) + r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result() + require.Equal(t, int64(0), r) + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: messageID, + Values: []string{"key", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + + // Have the consumer claim the message with ID [messageID] + err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 1, + NoAck: false, + }).Err() + require.NoError(t, err) + + // Acknowledge the message, so no messages remain pending + require.NoError(t, rdb.XAck(ctx, streamName, groupName, messageID).Err()) + + // Check that XPENDING sets the min and max to nil, matching Redis' behavior + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, &redis.XPending{ + Count: 0, + Lower: "", + Higher: "", + Consumers: map[string]int64{}, + }, pending) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) { From 02f8e81670ef01a578b4105f981cee5db469da37 Mon Sep 17 00:00:00 2001 From: Nathan Lo Date: Thu, 3 Oct 2024 16:55:27 -0400 Subject: [PATCH 5/5] Check error; fix linter issues --- tests/gocase/unit/type/stream/stream_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index e31549f8258..1f6a35c05ab 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2164,10 +2164,11 @@ func TestStreamOffset(t *testing.T) { groupName := "group" consumerName := "consumer" messageID := "1-0" - + // Remove any existing data require.NoError(t, rdb.Del(ctx, streamName).Err()) r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result() + require.NoError(t, err) require.Equal(t, int64(0), r) require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ @@ -2194,9 +2195,9 @@ func TestStreamOffset(t *testing.T) { pending, err := rdb.XPending(ctx, streamName, groupName).Result() require.NoError(t, err) require.Equal(t, &redis.XPending{ - Count: 0, - Lower: "", - Higher: "", + Count: 0, + Lower: "", + Higher: "", Consumers: map[string]int64{}, }, pending) })