diff --git a/src/commands/cmd_search.cc b/src/commands/cmd_search.cc index 1928672c93a..ea42f26977c 100644 --- a/src/commands/cmd_search.cc +++ b/src/commands/cmd_search.cc @@ -358,13 +358,14 @@ class CommandFTDrop : public Commander { }; }; -REDIS_REGISTER_COMMANDS(MakeCmdAttr("ft.create", -2, "write exclusive no-multi no-script", 0, 0, 0), - MakeCmdAttr("ft.searchsql", 2, "read-only", 0, 0, 0), - MakeCmdAttr("ft.search", -3, "read-only", 0, 0, 0), - MakeCmdAttr("ft.explainsql", -2, "read-only", 0, 0, 0), - MakeCmdAttr("ft.explain", -3, "read-only", 0, 0, 0), - MakeCmdAttr("ft.info", 2, "read-only", 0, 0, 0), - MakeCmdAttr("ft._list", 1, "read-only", 0, 0, 0), - MakeCmdAttr("ft.dropindex", 2, "write exclusive no-multi no-script", 0, 0, 0)); +// REDIS_REGISTER_COMMANDS(MakeCmdAttr("ft.create", -2, "write exclusive no-multi no-script", 0, 0, 0), +// MakeCmdAttr("ft.searchsql", 2, "read-only", 0, 0, 0), +// MakeCmdAttr("ft.search", -3, "read-only", 0, 0, 0), +// MakeCmdAttr("ft.explainsql", -2, "read-only", 0, 0, 0), +// MakeCmdAttr("ft.explain", -3, "read-only", 0, 0, 0), +// MakeCmdAttr("ft.info", 2, "read-only", 0, 0, 0), +// MakeCmdAttr("ft._list", 1, "read-only", 0, 0, 0), +// MakeCmdAttr("ft.dropindex", 2, "write exclusive no-multi no-script", 0, 0, +// 0)); } // namespace redis diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 7faeb9c49cc..ee608be2158 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -655,10 +655,10 @@ class CommandXInfo : public Commander { count_ = *parse_result; } - } else if (val == "groups" && args.size() == 3) { - subcommand_ = "groups"; - } else if (val == "consumers" && args.size() == 4) { - subcommand_ = "consumers"; + // } else if (val == "groups" && args.size() == 3) { + // subcommand_ = "groups"; + // } else if (val == "consumers" && args.size() == 4) { + // subcommand_ = "consumers"; } else { return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; } @@ -1736,19 +1736,20 @@ class CommandXSetId : public Commander { std::optional entries_added_; }; -REDIS_REGISTER_COMMANDS(MakeCmdAttr("xack", -4, "write no-dbsize-check", 1, 1, 1), - MakeCmdAttr("xadd", -5, "write", 1, 1, 1), - MakeCmdAttr("xdel", -3, "write no-dbsize-check", 1, 1, 1), - MakeCmdAttr("xclaim", -6, "write", 1, 1, 1), - MakeCmdAttr("xautoclaim", -6, "write", 1, 1, 1), - MakeCmdAttr("xgroup", -4, "write", 2, 2, 1), - MakeCmdAttr("xlen", -2, "read-only", 1, 1, 1), - MakeCmdAttr("xinfo", -2, "read-only", 0, 0, 0), - MakeCmdAttr("xrange", -4, "read-only", 1, 1, 1), - MakeCmdAttr("xrevrange", -2, "read-only", 1, 1, 1), - MakeCmdAttr("xread", -4, "read-only", 0, 0, 0), - MakeCmdAttr("xreadgroup", -7, "write", 0, 0, 0), - MakeCmdAttr("xtrim", -4, "write no-dbsize-check", 1, 1, 1), - MakeCmdAttr("xsetid", -3, "write", 1, 1, 1)) +REDIS_REGISTER_COMMANDS( + // MakeCmdAttr("xack", -4, "write no-dbsize-check", 1, 1, 1), + MakeCmdAttr("xadd", -5, "write", 1, 1, 1), + MakeCmdAttr("xdel", -3, "write no-dbsize-check", 1, 1, 1), + // MakeCmdAttr("xclaim", -6, "write", 1, 1, 1), + // MakeCmdAttr("xautoclaim", -6, "write", 1, 1, 1), + // MakeCmdAttr("xgroup", -4, "write", 2, 2, 1), + MakeCmdAttr("xlen", -2, "read-only", 1, 1, 1), + MakeCmdAttr("xinfo", -2, "read-only", 0, 0, 0), + MakeCmdAttr("xrange", -4, "read-only", 1, 1, 1), + MakeCmdAttr("xrevrange", -2, "read-only", 1, 1, 1), + MakeCmdAttr("xread", -4, "read-only", 0, 0, 0), + // MakeCmdAttr("xreadgroup", -7, "write", 0, 0, 0), + MakeCmdAttr("xtrim", -4, "write no-dbsize-check", 1, 1, 1), + MakeCmdAttr("xsetid", -3, "write", 1, 1, 1)) } // namespace redis diff --git a/tests/gocase/unit/search/search_test.go b/tests/gocase/unit/search/search_test.go index bb87c19fc5f..651323c1710 100644 --- a/tests/gocase/unit/search/search_test.go +++ b/tests/gocase/unit/search/search_test.go @@ -29,6 +29,8 @@ import ( ) func TestSearch(t *testing.T) { + t.Skip("search commands is disabled") + srv := util.StartServer(t, map[string]string{}) defer srv.Close() diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index b8958e70f31..4ba461ff341 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -867,1043 +867,1043 @@ func TestStreamOffset(t *testing.T) { require.EqualValues(t, providedSeqNum, seqNum) }) - t.Run("XGROUP CREATE with different kinds of commands and XGROUP DESTROY", func(t *testing.T) { - streamName := "test-stream-a" - groupName := "test-group-a" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - // No such stream (No such key) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "10").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM", "ENTRIESREAD").Err()) - require.NoError(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM").Err()) - require.NoError(t, rdb.XInfoStream(ctx, streamName).Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err()) - // Invalid syntax - groupName = "test-group-b" - require.Error(t, rdb.Do(ctx, "XGROUP", "CREAT", streamName, groupName, "$").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIEREAD", "10").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "-10").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, "1test-group-c", "$").Err()) - - require.NoError(t, rdb.Del(ctx, "myStream").Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup", "$").Err()) - result, err := rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result() - require.NoError(t, err) - require.Equal(t, int64(1), result) - result, err = rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result() - require.NoError(t, err) - require.Equal(t, int64(0), result) - }) - - t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t *testing.T) { - streamName := "test-stream" - groupName := "test-group" - consumerName := "test-consumer" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - //No such stream - require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"data", "a"}, - }).Err()) - //no such group - require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) - - r := rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val() - require.Equal(t, int64(1), r) - r = rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val() - require.Equal(t, int64(0), r) - }) - - t.Run("XGROUP DELCONSUMER with different kinds of commands", func(t *testing.T) { - streamName := "test-stream" - groupName := "test-group" - consumerName := "test-consumer" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - //No such stream - require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"data", "a"}, - }).Err()) - //no such group - require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) - require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) - - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"data1", "a1"}, - }).Err()) - require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 1, - NoAck: false, - }).Err()) - ri, erri := rdb.XInfoGroups(ctx, streamName).Result() - require.NoError(t, erri) - require.Equal(t, int64(1), ri[0].Consumers) - require.Equal(t, int64(1), ri[0].Pending) - - r, err := rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Result() - require.NoError(t, err) - require.Equal(t, int64(1), r) - ri, erri = rdb.XInfoGroups(ctx, streamName).Result() - require.NoError(t, erri) - require.Equal(t, int64(0), ri[0].Consumers) - require.Equal(t, int64(0), ri[0].Pending) - - require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"data2", "a2"}, - }).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"data3", "a3"}, - }).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"data4", "a4"}, - }).Err()) - require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 3, - NoAck: false, - }).Err()) - ri, erri = rdb.XInfoGroups(ctx, streamName).Result() - require.NoError(t, erri) - require.Equal(t, int64(1), ri[0].Consumers) - require.Equal(t, int64(3), ri[0].Pending) - r, err = rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Result() - require.NoError(t, err) - require.Equal(t, int64(3), r) - ri, erri = rdb.XInfoGroups(ctx, streamName).Result() - require.NoError(t, erri) - require.Equal(t, int64(0), ri[0].Consumers) - require.Equal(t, int64(0), ri[0].Pending) - }) - - t.Run("XGROUP SETID with different kinds of commands", func(t *testing.T) { - streamName := "test-stream" - groupName := "test-group" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - //No such stream - require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"data", "a"}, - }).Err()) - //No such group - require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) - - require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName, "0-0").Err()) - require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entries", "100").Err()) - require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "-100").Err()) - require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "100").Err()) - }) - - t.Run("XINFO GROUPS and XINFO CONSUMERS", func(t *testing.T) { - streamName := "test-stream" - group1 := "t1" - group2 := "t2" - consumer1 := "c1" - consumer2 := "c2" - consumer3 := "c3" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"data", "a"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, group1, "$").Err()) - r := rdb.XInfoGroups(ctx, streamName).Val() - require.Equal(t, group1, r[0].Name) - require.Equal(t, int64(0), r[0].Consumers) - require.Equal(t, int64(0), r[0].Pending) - require.Equal(t, "1-0", r[0].LastDeliveredID) - require.Equal(t, int64(0), r[0].EntriesRead) - require.Equal(t, int64(0), r[0].Lag) - - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "2-0", - Values: []string{"data1", "b"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, group2, "$").Err()) - r = rdb.XInfoGroups(ctx, streamName).Val() - require.Equal(t, group2, r[1].Name) - require.Equal(t, "2-0", r[1].LastDeliveredID) - - require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer1).Err()) - require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer2).Err()) - require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group2, consumer3).Err()) - r = rdb.XInfoGroups(ctx, streamName).Val() - require.Equal(t, int64(2), r[0].Consumers) - require.Equal(t, int64(1), r[1].Consumers) - - r1 := rdb.XInfoConsumers(ctx, streamName, group1).Val() - require.Equal(t, consumer1, r1[0].Name) - require.Equal(t, consumer2, r1[1].Name) - r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val() - require.Equal(t, consumer3, r1[0].Name) - }) - - t.Run("XINFO after delete pending message and related consumer, for issue #2350", func(t *testing.T) { - streamName := "test-stream-2350" - groupName := "test-group-2350" - consumerName := "test-consumer-2350" - require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "$").Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"testing", "overflow"}, - }).Err()) - readRsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 1, - NoAck: false, - }) - require.NoError(t, readRsp.Err()) - require.Len(t, readRsp.Val(), 1) - streamRsp := readRsp.Val()[0] - require.Len(t, streamRsp.Messages, 1) - msgID := streamRsp.Messages[0] - require.NoError(t, rdb.XAck(ctx, streamName, groupName, msgID.ID).Err()) - require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err()) - infoRsp := rdb.XInfoGroups(ctx, streamName) - require.NoError(t, infoRsp.Err()) - infoGroups := infoRsp.Val() - require.Len(t, infoGroups, 1) - infoGroup := infoGroups[0] - require.Equal(t, groupName, infoGroup.Name) - require.Equal(t, int64(0), infoGroup.Consumers) - require.Equal(t, int64(0), infoGroup.Pending) - require.Equal(t, msgID.ID, infoGroup.LastDeliveredID) - }) - - t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) { - streamName := "test-stream" - group := "group" - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"data1", "b"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, group, "0").Err()) - require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group, "consumer").Err()) - require.NoError(t, rdb.XRead(ctx, &redis.XReadArgs{ - Streams: []string{streamName, "0"}, - }).Err()) - }) - - t.Run("XREADGROUP with different kinds of commands", func(t *testing.T) { - streamName := "mystream" - groupName := "mygroup" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"field1", "data1"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) - consumerName := "myconsumer" - r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 1, - NoAck: false, - }).Result() - require.NoError(t, err) - require.Equal(t, []redis.XStream{{ - Stream: streamName, - Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}}, - }}, r) - - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "2-0", - Values: []string{"field2", "data2"}, - }).Err()) - r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 1, - NoAck: false, - }).Result() - require.NoError(t, err) - require.Equal(t, []redis.XStream{{ - Stream: streamName, - Messages: []redis.XMessage{{ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}}, - }}, r) - - r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, "0"}, - Count: 2, - NoAck: false, - }).Result() - require.NoError(t, err) - require.Equal(t, []redis.XStream{{ - Stream: streamName, - Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}, - {ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}}, - }}, r) - - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "3-0", - Values: []string{"field3", "data3"}, - }).Err()) - r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 1, - NoAck: true, - }).Result() - require.NoError(t, err) - require.Equal(t, []redis.XStream{{ - Stream: streamName, - Messages: []redis.XMessage{{ID: "3-0", Values: map[string]interface{}{"field3": "data3"}}}, - }}, r) - r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, "0"}, - Count: 2, - NoAck: false, - }).Result() - require.NoError(t, err) - require.Equal(t, []redis.XStream{{ - Stream: streamName, - Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}, - {ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}}, - }}, r) - - c := srv.NewClient() - defer func() { require.NoError(t, c.Close()) }() - ch := make(chan []redis.XStream) - go func() { - ch <- c.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 2, - Block: 10 * time.Second, - NoAck: false, - }).Val() - }() - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "4-0", - Values: []string{"field4", "data4"}, - }).Err()) - r = <-ch - require.Equal(t, []redis.XStream{{ - Stream: streamName, - Messages: []redis.XMessage{{ID: "4-0", Values: map[string]interface{}{"field4": "data4"}}}, - }}, r) - - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "5-0", - Values: []string{"field5", "data5"}, - }).Err()) - require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 1, - NoAck: false, - }).Err()) - require.NoError(t, rdb.XDel(ctx, streamName, "5-0").Err()) - r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, "5"}, - Count: 1, - NoAck: false, - }).Result() - require.NoError(t, err) - require.Equal(t, []redis.XStream{{ - Stream: streamName, - Messages: []redis.XMessage{{ID: "5-0", Values: map[string]interface{}(nil)}}, - }}, r) - }) - - t.Run("Check xreadgroup fetches the newest data after create consumer in the command", func(t *testing.T) { - streamName := "mystream" - groupName := "mygroup" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"field1", "data1"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) - consumerName := "myconsumer" - err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 1, - NoAck: false, - }).Err() - require.NoError(t, err) - ri, erri := rdb.XInfoGroups(ctx, streamName).Result() - require.NoError(t, erri) - require.Equal(t, int64(1), ri[0].Consumers) - }) - - t.Run("XACK with different kinds of commands", func(t *testing.T) { - streamName := "mystream" - groupName := "mygroup" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result() - require.NoError(t, err) - require.Equal(t, int64(0), r) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"field1", "data1"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) - consumerName := "myconsumer" - err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 1, - NoAck: false, - }).Err() - require.NoError(t, err) - r, err = rdb.XAck(ctx, streamName, groupName, "1-0").Result() - require.NoError(t, err) - require.Equal(t, int64(1), r) - - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "2-0", - Values: []string{"field1", "data1"}, - }).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "3-0", - Values: []string{"field1", "data1"}, - }).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "4-0", - Values: []string{"field1", "data1"}, - }).Err()) - err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 3, - NoAck: false, - }).Err() - require.NoError(t, err) - r, err = rdb.XAck(ctx, streamName, groupName, "2-0", "3-0", "4-0").Result() - require.NoError(t, err) - require.Equal(t, int64(3), r) - }) - - t.Run("Simple XCLAIM command tests", func(t *testing.T) { - streamName := "mystream" - groupName := "mygroup" - consumerName := "myconsumer" - consumer1Name := "myconsumer1" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"field1", "data1"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) - r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 1, - NoAck: false, - }).Result() - require.NoError(t, err) - require.Equal(t, []redis.XStream{{ - Stream: streamName, - Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}}, - }}, r) - - claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumer1Name, - MinIdle: 0, - Messages: []string{"1-0"}, - }).Result() - require.NoError(t, err) - require.Len(t, claimedMessages, 1, "Expected to claim 1 message") - require.Equal(t, "1-0", claimedMessages[0].ID, "Expected claimed message ID to match") - - time.Sleep(2000 * time.Millisecond) - minIdleTime := 1000 * time.Millisecond - claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumerName, - MinIdle: minIdleTime, - Messages: []string{"1-0"}, - }).Result() - require.NoError(t, err) - require.Len(t, claimedMessages, 1, "Expected to claim 1 message if idle time is large enough") - require.Equal(t, "1-0", claimedMessages[0].ID, "Expected claimed message ID to match") - - minIdleTime = 60000 * time.Millisecond - claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumer1Name, - MinIdle: minIdleTime, - Messages: []string{"1-0"}, - }).Result() - - require.NoError(t, err) - require.Empty(t, claimedMessages, "Expected no messages to be claimed due to insufficient idle time") - }) - - t.Run("XCLAIM with different timing situations and options", func(t *testing.T) { - streamName := "mystream" - groupName := "mygroup" - consumerName := "myconsumer" - consumer1Name := "myconsumer1" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"field1", "data1"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) - r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamName, ">"}, - Count: 1, - NoAck: false, - }).Result() - require.NoError(t, err) - require.Equal(t, []redis.XStream{{ - Stream: streamName, - Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}}, - }}, r) - - rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName, groupName, consumer1Name, "0", "1-0", "IDLE", "5000").Result() - require.NoError(t, err) - messages, ok := rawClaimedMessages.([]interface{}) - require.True(t, ok, "Expected the result to be a slice of interface{}") - firstMsg, ok := messages[0].([]interface{}) - require.True(t, ok, "Expected message details to be a slice of interface{}") - msgID, ok := firstMsg[0].(string) - require.True(t, ok, "Expected message ID to be a string") - require.Equal(t, "1-0", msgID, "Expected claimed message ID to match") - - claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumerName, - MinIdle: 2000 * time.Millisecond, - Messages: []string{"1-0"}, - }).Result() - require.NoError(t, err) - require.Len(t, claimedMessages, 1, "Expected to claim 1 message if idle time is large enough") - require.Equal(t, "1-0", claimedMessages[0].ID, "Expected claimed message ID to match") - - tenSecondsAgo := time.Now().Add(-10 * time.Second).UnixMilli() - rawClaimedMessages, err = rdb.Do(ctx, "XCLAIM", streamName, groupName, consumer1Name, "0", "1-0", "TIME", tenSecondsAgo).Result() - require.NoError(t, err) - messages, ok = rawClaimedMessages.([]interface{}) - require.True(t, ok, "Expected the result to be a slice of interface{}") - firstMsg, ok = messages[0].([]interface{}) - require.True(t, ok, "Expected message details to be a slice of interface{}") - msgID, ok = firstMsg[0].(string) - require.True(t, ok, "Expected message ID to be a string") - require.Equal(t, "1-0", msgID, "Expected claimed message ID to match") - - claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumerName, - MinIdle: 5000 * time.Millisecond, - Messages: []string{"1-0"}, - }).Result() - require.NoError(t, err) - require.Len(t, claimedMessages, 1, "Expected to claim 1 message if idle time is large enough") - require.Equal(t, "1-0", claimedMessages[0].ID, "Expected claimed message ID to match") - }) - - t.Run("XCLAIM command with different options", func(t *testing.T) { - streamName := "mystream" - groupName := "mygroup" - consumerName := "myconsumer" - consumer1Name := "myconsumer1" - - require.NoError(t, rdb.Del(ctx, streamName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"field1", "data1"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) - - rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName, groupName, consumerName, "0", "1-0", "FORCE").Result() - require.NoError(t, err) - messages, ok := rawClaimedMessages.([]interface{}) - require.True(t, ok, "Expected the result to be a slice of interface{}") - firstMsg, ok := messages[0].([]interface{}) - require.True(t, ok, "Expected message details to be a slice of interface{}") - msgID, ok := firstMsg[0].(string) - require.True(t, ok, "Expected message ID to be a string") - require.Equal(t, "1-0", msgID, "Expected claimed message ID to match") - - cmd := rdb.XClaimJustID(ctx, &redis.XClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumer1Name, - MinIdle: 0, - Messages: []string{"1-0"}, - }) - - claimedIDs, err := cmd.Result() - require.NoError(t, err) - require.Len(t, claimedIDs, 1, "Expected to claim exactly one message ID") - require.Equal(t, "1-0", claimedIDs[0], "Expected claimed message ID to match") - }) - - t.Run("XAUTOCLAIM can claim PEL items from another consume", func(t *testing.T) { - - streamName := "mystream" - groupName := "mygroup" - var id1 string - require.NoError(t, rdb.Del(ctx, streamName).Err()) - { - rsp := rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"a", "1"}, - }) - require.NoError(t, rsp.Err()) - id1 = rsp.Val() - } - var id2 string - { - rsp := rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"b", "2"}, - }) - require.NoError(t, rsp.Err()) - id2 = rsp.Val() - } - var id3 string - { - rsp := rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"c", "3"}, - }) - require.NoError(t, rsp.Err()) - id3 = rsp.Val() - } - var id4 string - { - rsp := rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"d", "4"}, - }) - require.NoError(t, rsp.Err()) - id4 = rsp.Val() - } - - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) - - consumer1 := "consumer1" - consumer2 := "consumer2" - { - rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumer1, - Streams: []string{streamName, ">"}, - Count: 1, - }) - require.NoError(t, rsp.Err()) - require.Len(t, rsp.Val(), 1) - require.Len(t, rsp.Val()[0].Messages, 1) - require.Equal(t, id1, rsp.Val()[0].Messages[0].ID) - require.Len(t, rsp.Val()[0].Messages[0].Values, 1) - require.Equal(t, "1", rsp.Val()[0].Messages[0].Values["a"]) - } - - { - time.Sleep(200 * time.Millisecond) - rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumer2, - MinIdle: 10 * time.Millisecond, - Count: 1, - Start: "-", - }) - require.NoError(t, rsp.Err()) - msgs, start := rsp.Val() - require.Equal(t, "0-0", start) - require.Len(t, msgs, 1) - require.Len(t, msgs[0].Values, 1) - require.Equal(t, "1", msgs[0].Values["a"]) - } - - { - rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumer1, - Streams: []string{streamName, ">"}, - Count: 3, - }) - require.NoError(t, rsp.Err()) - - time.Sleep(time.Millisecond * 200) - require.NoError(t, rdb.XDel(ctx, streamName, id2).Err()) - } - - { - cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, consumer2, 10, "-", "COUNT", 3) - require.NoError(t, cmd.Err()) - require.Equal(t, []interface{}{ - id4, - []interface{}{ - []interface{}{ - id1, - []interface{}{"a", "1"}, - }, - []interface{}{ - id3, - []interface{}{"c", "3"}, - }, - }, - []interface{}{ - id2, - }, - }, cmd.Val()) - } - - { - time.Sleep(time.Millisecond * 200) - require.NoError(t, rdb.XDel(ctx, streamName, id4).Err()) - rsp := rdb.XAutoClaimJustID(ctx, &redis.XAutoClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumer2, - MinIdle: 10 * time.Millisecond, - Start: "-", - }) - require.NoError(t, rsp.Err()) - msgs, start := rsp.Val() - require.Equal(t, "0-0", start) - require.Len(t, msgs, 2) - require.Equal(t, id1, msgs[0]) - require.Equal(t, id3, msgs[1]) - } - }) - - t.Run("XAUTOCLAIM as an iterator", func(t *testing.T) { - streamName := "mystream" - groupName := "mygroup" - var id3, id5 string - require.NoError(t, rdb.Del(ctx, streamName).Err()) - { - rsp := rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"a", "1"}, - }) - require.NoError(t, rsp.Err()) - } - { - rsp := rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"b", "2"}, - }) - require.NoError(t, rsp.Err()) - } - { - rsp := rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"c", "3"}, - }) - require.NoError(t, rsp.Err()) - id3 = rsp.Val() - } - { - rsp := rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"d", "4"}, - }) - require.NoError(t, rsp.Err()) - } - { - rsp := rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"e", "5"}, - }) - require.NoError(t, rsp.Err()) - id5 = rsp.Val() - } - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) - - consumer1, consumer2 := "consumer1", "consumer2" - { - rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumer1, - Streams: []string{streamName, ">"}, - Count: 90, - }) - require.NoError(t, rsp.Err()) - time.Sleep(200 * time.Millisecond) - } - { - rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumer2, - MinIdle: 10 * time.Millisecond, - Count: 2, - Start: "-", - }) - require.NoError(t, rsp.Err()) - msgs, start := rsp.Val() - require.Equal(t, id3, start) - require.Len(t, msgs, 2) - require.Len(t, msgs[0].Values, 1) - require.Equal(t, "1", msgs[0].Values["a"]) - } - - { - rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumer2, - MinIdle: 10 * time.Millisecond, - Start: id3, - Count: 2, - }) - require.NoError(t, rsp.Err()) - msgs, start := rsp.Val() - require.Equal(t, id5, start) - require.Len(t, msgs, 2) - require.Len(t, msgs[0].Values, 1) - require.Equal(t, "3", msgs[0].Values["c"]) - } - - { - rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ - Stream: streamName, - Group: groupName, - Consumer: consumer2, - MinIdle: 10 * time.Millisecond, - Start: id5, - Count: 1, - }) - require.NoError(t, rsp.Err()) - msgs, start := rsp.Val() - require.Equal(t, "0-0", start) - require.Len(t, msgs, 1) - require.Len(t, msgs[0].Values, 1) - require.Equal(t, "5", msgs[0].Values["e"]) - } - }) - - t.Run("XAUTOCLAIM with XDEL", func(t *testing.T) { - streamName := "x" - groupName := "grp" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: map[string]interface{}{"f": "v"}, - }).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "2-0", - Values: map[string]interface{}{"f": "v"}, - }).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "3-0", - Values: map[string]interface{}{"f": "v"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) - { - rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: "Alice", - Streams: []string{streamName, ">"}, - }) - require.NoError(t, rsp.Err()) - require.Len(t, rsp.Val(), 1) - require.Len(t, rsp.Val()[0].Messages, 3) - require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID) - require.Equal(t, "v", rsp.Val()[0].Messages[0].Values["f"]) - require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID) - require.Equal(t, "v", rsp.Val()[0].Messages[1].Values["f"]) - require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID) - require.Equal(t, "v", rsp.Val()[0].Messages[2].Values["f"]) - } - { - require.NoError(t, rdb.XDel(ctx, streamName, "2-0").Err()) - cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "0-0") - require.NoError(t, cmd.Err()) - require.Equal(t, []interface{}{ - "0-0", - []interface{}{ - []interface{}{ - "1-0", - []interface{}{"f", "v"}, - }, - []interface{}{ - "3-0", - []interface{}{"f", "v"}, - }, - }, - []interface{}{ - "2-0", - }, - }, cmd.Val()) - } - }) - - t.Run("XAUTOCLAIM with XDEL and count", func(t *testing.T) { - streamName := "x" - groupName := "grp" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: map[string]interface{}{"f": "v"}, - }).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "2-0", - Values: map[string]interface{}{"f": "v"}, - }).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "3-0", - Values: map[string]interface{}{"f": "v"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) - { - rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: "Alice", - Streams: []string{streamName, ">"}, - }) - require.NoError(t, rsp.Err()) - require.Len(t, rsp.Val(), 1) - require.Len(t, rsp.Val()[0].Messages, 3) - require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID) - require.Equal(t, "v", rsp.Val()[0].Messages[0].Values["f"]) - require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID) - require.Equal(t, "v", rsp.Val()[0].Messages[1].Values["f"]) - require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID) - require.Equal(t, "v", rsp.Val()[0].Messages[2].Values["f"]) - } - { - require.NoError(t, rdb.XDel(ctx, streamName, "1-0").Err()) - require.NoError(t, rdb.XDel(ctx, streamName, "2-0").Err()) - cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "0-0", "COUNT", 1) - require.NoError(t, cmd.Err()) - require.Equal(t, []interface{}{ - "2-0", - []interface{}{}, - []interface{}{ - "1-0", - }, - }, cmd.Val()) - } - { - cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "2-0", "COUNT", 1) - require.NoError(t, cmd.Err()) - require.Equal(t, []interface{}{ - "3-0", - []interface{}{}, - []interface{}{ - "2-0", - }, - }, cmd.Val()) - } - { - cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "3-0", "COUNT", 1) - require.NoError(t, cmd.Err()) - require.Equal(t, []interface{}{ - "0-0", - []interface{}{ - []interface{}{ - "3-0", - []interface{}{"f", "v"}, - }, - }, - []interface{}{}, - }, cmd.Val()) - } - // assert_equal [XPENDING x grp - + 10 Alice] {} - // add xpending to this test case when it is supported - }) - - t.Run("XAUTOCLAIM with out of range count", func(t *testing.T) { - err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ - Stream: "x", - Group: "grp", - Consumer: "Bob", - MinIdle: 0, - Start: "3-0", - Count: 8070450532247928833, - }).Err() - require.Error(t, err) - require.True(t, strings.HasPrefix(err.Error(), "ERR COUNT")) - }) - - t.Run("XAUTOCLAIM COUNT must be > 0", func(t *testing.T) { - cmd := rdb.Do(ctx, "XAUTOCLAIM", "key", "group", "consumer", 1, 1, "COUNT", 0) - require.Error(t, cmd.Err()) - require.Equal(t, "ERR COUNT must be > 0", cmd.Err().Error()) - }) + // t.Run("XGROUP CREATE with different kinds of commands and XGROUP DESTROY", func(t *testing.T) { + // streamName := "test-stream-a" + // groupName := "test-group-a" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // // No such stream (No such key) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "10").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM", "ENTRIESREAD").Err()) + // require.NoError(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM").Err()) + // require.NoError(t, rdb.XInfoStream(ctx, streamName).Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err()) + // // Invalid syntax + // groupName = "test-group-b" + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREAT", streamName, groupName, "$").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIEREAD", "10").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "-10").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, "1test-group-c", "$").Err()) + + // require.NoError(t, rdb.Del(ctx, "myStream").Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup", "$").Err()) + // result, err := rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result() + // require.NoError(t, err) + // require.Equal(t, int64(1), result) + // result, err = rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result() + // require.NoError(t, err) + // require.Equal(t, int64(0), result) + // }) + + // t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t *testing.T) { + // streamName := "test-stream" + // groupName := "test-group" + // consumerName := "test-consumer" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // //No such stream + // require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"data", "a"}, + // }).Err()) + // //no such group + // require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) + + // r := rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val() + // require.Equal(t, int64(1), r) + // r = rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val() + // require.Equal(t, int64(0), r) + // }) + + // t.Run("XGROUP DELCONSUMER with different kinds of commands", func(t *testing.T) { + // streamName := "test-stream" + // groupName := "test-group" + // consumerName := "test-consumer" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // //No such stream + // require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"data", "a"}, + // }).Err()) + // //no such group + // require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) + // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) + + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"data1", "a1"}, + // }).Err()) + // require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 1, + // NoAck: false, + // }).Err()) + // ri, erri := rdb.XInfoGroups(ctx, streamName).Result() + // require.NoError(t, erri) + // require.Equal(t, int64(1), ri[0].Consumers) + // require.Equal(t, int64(1), ri[0].Pending) + + // r, err := rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Result() + // require.NoError(t, err) + // require.Equal(t, int64(1), r) + // ri, erri = rdb.XInfoGroups(ctx, streamName).Result() + // require.NoError(t, erri) + // require.Equal(t, int64(0), ri[0].Consumers) + // require.Equal(t, int64(0), ri[0].Pending) + + // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"data2", "a2"}, + // }).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"data3", "a3"}, + // }).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"data4", "a4"}, + // }).Err()) + // require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 3, + // NoAck: false, + // }).Err()) + // ri, erri = rdb.XInfoGroups(ctx, streamName).Result() + // require.NoError(t, erri) + // require.Equal(t, int64(1), ri[0].Consumers) + // require.Equal(t, int64(3), ri[0].Pending) + // r, err = rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Result() + // require.NoError(t, err) + // require.Equal(t, int64(3), r) + // ri, erri = rdb.XInfoGroups(ctx, streamName).Result() + // require.NoError(t, erri) + // require.Equal(t, int64(0), ri[0].Consumers) + // require.Equal(t, int64(0), ri[0].Pending) + // }) + + // t.Run("XGROUP SETID with different kinds of commands", func(t *testing.T) { + // streamName := "test-stream" + // groupName := "test-group" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // //No such stream + // require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"data", "a"}, + // }).Err()) + // //No such group + // require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) + + // require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName, "0-0").Err()) + // require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entries", "100").Err()) + // require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "-100").Err()) + // require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "100").Err()) + // }) + + // t.Run("XINFO GROUPS and XINFO CONSUMERS", func(t *testing.T) { + // streamName := "test-stream" + // group1 := "t1" + // group2 := "t2" + // consumer1 := "c1" + // consumer2 := "c2" + // consumer3 := "c3" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"data", "a"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, group1, "$").Err()) + // r := rdb.XInfoGroups(ctx, streamName).Val() + // require.Equal(t, group1, r[0].Name) + // require.Equal(t, int64(0), r[0].Consumers) + // require.Equal(t, int64(0), r[0].Pending) + // require.Equal(t, "1-0", r[0].LastDeliveredID) + // require.Equal(t, int64(0), r[0].EntriesRead) + // require.Equal(t, int64(0), r[0].Lag) + + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "2-0", + // Values: []string{"data1", "b"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, group2, "$").Err()) + // r = rdb.XInfoGroups(ctx, streamName).Val() + // require.Equal(t, group2, r[1].Name) + // require.Equal(t, "2-0", r[1].LastDeliveredID) + + // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer1).Err()) + // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer2).Err()) + // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group2, consumer3).Err()) + // r = rdb.XInfoGroups(ctx, streamName).Val() + // require.Equal(t, int64(2), r[0].Consumers) + // require.Equal(t, int64(1), r[1].Consumers) + + // r1 := rdb.XInfoConsumers(ctx, streamName, group1).Val() + // require.Equal(t, consumer1, r1[0].Name) + // require.Equal(t, consumer2, r1[1].Name) + // r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val() + // require.Equal(t, consumer3, r1[0].Name) + // }) + + // t.Run("XINFO after delete pending message and related consumer, for issue #2350", func(t *testing.T) { + // streamName := "test-stream-2350" + // groupName := "test-group-2350" + // consumerName := "test-consumer-2350" + // require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "$").Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"testing", "overflow"}, + // }).Err()) + // readRsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 1, + // NoAck: false, + // }) + // require.NoError(t, readRsp.Err()) + // require.Len(t, readRsp.Val(), 1) + // streamRsp := readRsp.Val()[0] + // require.Len(t, streamRsp.Messages, 1) + // msgID := streamRsp.Messages[0] + // require.NoError(t, rdb.XAck(ctx, streamName, groupName, msgID.ID).Err()) + // require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err()) + // infoRsp := rdb.XInfoGroups(ctx, streamName) + // require.NoError(t, infoRsp.Err()) + // infoGroups := infoRsp.Val() + // require.Len(t, infoGroups, 1) + // infoGroup := infoGroups[0] + // require.Equal(t, groupName, infoGroup.Name) + // require.Equal(t, int64(0), infoGroup.Consumers) + // require.Equal(t, int64(0), infoGroup.Pending) + // require.Equal(t, msgID.ID, infoGroup.LastDeliveredID) + // }) + + // t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) { + // streamName := "test-stream" + // group := "group" + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"data1", "b"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, group, "0").Err()) + // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group, "consumer").Err()) + // require.NoError(t, rdb.XRead(ctx, &redis.XReadArgs{ + // Streams: []string{streamName, "0"}, + // }).Err()) + // }) + + // t.Run("XREADGROUP with different kinds of commands", func(t *testing.T) { + // streamName := "mystream" + // groupName := "mygroup" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"field1", "data1"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + // consumerName := "myconsumer" + // r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 1, + // NoAck: false, + // }).Result() + // require.NoError(t, err) + // require.Equal(t, []redis.XStream{{ + // Stream: streamName, + // Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}}, + // }}, r) + + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "2-0", + // Values: []string{"field2", "data2"}, + // }).Err()) + // r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 1, + // NoAck: false, + // }).Result() + // require.NoError(t, err) + // require.Equal(t, []redis.XStream{{ + // Stream: streamName, + // Messages: []redis.XMessage{{ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}}, + // }}, r) + + // r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, "0"}, + // Count: 2, + // NoAck: false, + // }).Result() + // require.NoError(t, err) + // require.Equal(t, []redis.XStream{{ + // Stream: streamName, + // Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}, + // {ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}}, + // }}, r) + + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "3-0", + // Values: []string{"field3", "data3"}, + // }).Err()) + // r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 1, + // NoAck: true, + // }).Result() + // require.NoError(t, err) + // require.Equal(t, []redis.XStream{{ + // Stream: streamName, + // Messages: []redis.XMessage{{ID: "3-0", Values: map[string]interface{}{"field3": "data3"}}}, + // }}, r) + // r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, "0"}, + // Count: 2, + // NoAck: false, + // }).Result() + // require.NoError(t, err) + // require.Equal(t, []redis.XStream{{ + // Stream: streamName, + // Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}, + // {ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}}, + // }}, r) + + // c := srv.NewClient() + // defer func() { require.NoError(t, c.Close()) }() + // ch := make(chan []redis.XStream) + // go func() { + // ch <- c.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 2, + // Block: 10 * time.Second, + // NoAck: false, + // }).Val() + // }() + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "4-0", + // Values: []string{"field4", "data4"}, + // }).Err()) + // r = <-ch + // require.Equal(t, []redis.XStream{{ + // Stream: streamName, + // Messages: []redis.XMessage{{ID: "4-0", Values: map[string]interface{}{"field4": "data4"}}}, + // }}, r) + + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "5-0", + // Values: []string{"field5", "data5"}, + // }).Err()) + // require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 1, + // NoAck: false, + // }).Err()) + // require.NoError(t, rdb.XDel(ctx, streamName, "5-0").Err()) + // r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, "5"}, + // Count: 1, + // NoAck: false, + // }).Result() + // require.NoError(t, err) + // require.Equal(t, []redis.XStream{{ + // Stream: streamName, + // Messages: []redis.XMessage{{ID: "5-0", Values: map[string]interface{}(nil)}}, + // }}, r) + // }) + + // t.Run("Check xreadgroup fetches the newest data after create consumer in the command", func(t *testing.T) { + // streamName := "mystream" + // groupName := "mygroup" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"field1", "data1"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + // consumerName := "myconsumer" + // err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 1, + // NoAck: false, + // }).Err() + // require.NoError(t, err) + // ri, erri := rdb.XInfoGroups(ctx, streamName).Result() + // require.NoError(t, erri) + // require.Equal(t, int64(1), ri[0].Consumers) + // }) + + // t.Run("XACK with different kinds of commands", func(t *testing.T) { + // streamName := "mystream" + // groupName := "mygroup" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result() + // require.NoError(t, err) + // require.Equal(t, int64(0), r) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"field1", "data1"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + // consumerName := "myconsumer" + // err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 1, + // NoAck: false, + // }).Err() + // require.NoError(t, err) + // r, err = rdb.XAck(ctx, streamName, groupName, "1-0").Result() + // require.NoError(t, err) + // require.Equal(t, int64(1), r) + + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "2-0", + // Values: []string{"field1", "data1"}, + // }).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "3-0", + // Values: []string{"field1", "data1"}, + // }).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "4-0", + // Values: []string{"field1", "data1"}, + // }).Err()) + // err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 3, + // NoAck: false, + // }).Err() + // require.NoError(t, err) + // r, err = rdb.XAck(ctx, streamName, groupName, "2-0", "3-0", "4-0").Result() + // require.NoError(t, err) + // require.Equal(t, int64(3), r) + // }) + + // t.Run("Simple XCLAIM command tests", func(t *testing.T) { + // streamName := "mystream" + // groupName := "mygroup" + // consumerName := "myconsumer" + // consumer1Name := "myconsumer1" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"field1", "data1"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + // r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 1, + // NoAck: false, + // }).Result() + // require.NoError(t, err) + // require.Equal(t, []redis.XStream{{ + // Stream: streamName, + // Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}}, + // }}, r) + + // claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumer1Name, + // MinIdle: 0, + // Messages: []string{"1-0"}, + // }).Result() + // require.NoError(t, err) + // require.Len(t, claimedMessages, 1, "Expected to claim 1 message") + // require.Equal(t, "1-0", claimedMessages[0].ID, "Expected claimed message ID to match") + + // time.Sleep(2000 * time.Millisecond) + // minIdleTime := 1000 * time.Millisecond + // claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumerName, + // MinIdle: minIdleTime, + // Messages: []string{"1-0"}, + // }).Result() + // require.NoError(t, err) + // require.Len(t, claimedMessages, 1, "Expected to claim 1 message if idle time is large enough") + // require.Equal(t, "1-0", claimedMessages[0].ID, "Expected claimed message ID to match") + + // minIdleTime = 60000 * time.Millisecond + // claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumer1Name, + // MinIdle: minIdleTime, + // Messages: []string{"1-0"}, + // }).Result() + + // require.NoError(t, err) + // require.Empty(t, claimedMessages, "Expected no messages to be claimed due to insufficient idle time") + // }) + + // t.Run("XCLAIM with different timing situations and options", func(t *testing.T) { + // streamName := "mystream" + // groupName := "mygroup" + // consumerName := "myconsumer" + // consumer1Name := "myconsumer1" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"field1", "data1"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + // r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumerName, + // Streams: []string{streamName, ">"}, + // Count: 1, + // NoAck: false, + // }).Result() + // require.NoError(t, err) + // require.Equal(t, []redis.XStream{{ + // Stream: streamName, + // Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}}, + // }}, r) + + // rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName, groupName, consumer1Name, "0", "1-0", "IDLE", "5000").Result() + // require.NoError(t, err) + // messages, ok := rawClaimedMessages.([]interface{}) + // require.True(t, ok, "Expected the result to be a slice of interface{}") + // firstMsg, ok := messages[0].([]interface{}) + // require.True(t, ok, "Expected message details to be a slice of interface{}") + // msgID, ok := firstMsg[0].(string) + // require.True(t, ok, "Expected message ID to be a string") + // require.Equal(t, "1-0", msgID, "Expected claimed message ID to match") + + // claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumerName, + // MinIdle: 2000 * time.Millisecond, + // Messages: []string{"1-0"}, + // }).Result() + // require.NoError(t, err) + // require.Len(t, claimedMessages, 1, "Expected to claim 1 message if idle time is large enough") + // require.Equal(t, "1-0", claimedMessages[0].ID, "Expected claimed message ID to match") + + // tenSecondsAgo := time.Now().Add(-10 * time.Second).UnixMilli() + // rawClaimedMessages, err = rdb.Do(ctx, "XCLAIM", streamName, groupName, consumer1Name, "0", "1-0", "TIME", tenSecondsAgo).Result() + // require.NoError(t, err) + // messages, ok = rawClaimedMessages.([]interface{}) + // require.True(t, ok, "Expected the result to be a slice of interface{}") + // firstMsg, ok = messages[0].([]interface{}) + // require.True(t, ok, "Expected message details to be a slice of interface{}") + // msgID, ok = firstMsg[0].(string) + // require.True(t, ok, "Expected message ID to be a string") + // require.Equal(t, "1-0", msgID, "Expected claimed message ID to match") + + // claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumerName, + // MinIdle: 5000 * time.Millisecond, + // Messages: []string{"1-0"}, + // }).Result() + // require.NoError(t, err) + // require.Len(t, claimedMessages, 1, "Expected to claim 1 message if idle time is large enough") + // require.Equal(t, "1-0", claimedMessages[0].ID, "Expected claimed message ID to match") + // }) + + // t.Run("XCLAIM command with different options", func(t *testing.T) { + // streamName := "mystream" + // groupName := "mygroup" + // consumerName := "myconsumer" + // consumer1Name := "myconsumer1" + + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"field1", "data1"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + + // rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName, groupName, consumerName, "0", "1-0", "FORCE").Result() + // require.NoError(t, err) + // messages, ok := rawClaimedMessages.([]interface{}) + // require.True(t, ok, "Expected the result to be a slice of interface{}") + // firstMsg, ok := messages[0].([]interface{}) + // require.True(t, ok, "Expected message details to be a slice of interface{}") + // msgID, ok := firstMsg[0].(string) + // require.True(t, ok, "Expected message ID to be a string") + // require.Equal(t, "1-0", msgID, "Expected claimed message ID to match") + + // cmd := rdb.XClaimJustID(ctx, &redis.XClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumer1Name, + // MinIdle: 0, + // Messages: []string{"1-0"}, + // }) + + // claimedIDs, err := cmd.Result() + // require.NoError(t, err) + // require.Len(t, claimedIDs, 1, "Expected to claim exactly one message ID") + // require.Equal(t, "1-0", claimedIDs[0], "Expected claimed message ID to match") + // }) + + // t.Run("XAUTOCLAIM can claim PEL items from another consume", func(t *testing.T) { + + // streamName := "mystream" + // groupName := "mygroup" + // var id1 string + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // { + // rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"a", "1"}, + // }) + // require.NoError(t, rsp.Err()) + // id1 = rsp.Val() + // } + // var id2 string + // { + // rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"b", "2"}, + // }) + // require.NoError(t, rsp.Err()) + // id2 = rsp.Val() + // } + // var id3 string + // { + // rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"c", "3"}, + // }) + // require.NoError(t, rsp.Err()) + // id3 = rsp.Val() + // } + // var id4 string + // { + // rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"d", "4"}, + // }) + // require.NoError(t, rsp.Err()) + // id4 = rsp.Val() + // } + + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + + // consumer1 := "consumer1" + // consumer2 := "consumer2" + // { + // rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumer1, + // Streams: []string{streamName, ">"}, + // Count: 1, + // }) + // require.NoError(t, rsp.Err()) + // require.Len(t, rsp.Val(), 1) + // require.Len(t, rsp.Val()[0].Messages, 1) + // require.Equal(t, id1, rsp.Val()[0].Messages[0].ID) + // require.Len(t, rsp.Val()[0].Messages[0].Values, 1) + // require.Equal(t, "1", rsp.Val()[0].Messages[0].Values["a"]) + // } + + // { + // time.Sleep(200 * time.Millisecond) + // rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumer2, + // MinIdle: 10 * time.Millisecond, + // Count: 1, + // Start: "-", + // }) + // require.NoError(t, rsp.Err()) + // msgs, start := rsp.Val() + // require.Equal(t, "0-0", start) + // require.Len(t, msgs, 1) + // require.Len(t, msgs[0].Values, 1) + // require.Equal(t, "1", msgs[0].Values["a"]) + // } + + // { + // rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumer1, + // Streams: []string{streamName, ">"}, + // Count: 3, + // }) + // require.NoError(t, rsp.Err()) + + // time.Sleep(time.Millisecond * 200) + // require.NoError(t, rdb.XDel(ctx, streamName, id2).Err()) + // } + + // { + // cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, consumer2, 10, "-", "COUNT", 3) + // require.NoError(t, cmd.Err()) + // require.Equal(t, []interface{}{ + // id4, + // []interface{}{ + // []interface{}{ + // id1, + // []interface{}{"a", "1"}, + // }, + // []interface{}{ + // id3, + // []interface{}{"c", "3"}, + // }, + // }, + // []interface{}{ + // id2, + // }, + // }, cmd.Val()) + // } + + // { + // time.Sleep(time.Millisecond * 200) + // require.NoError(t, rdb.XDel(ctx, streamName, id4).Err()) + // rsp := rdb.XAutoClaimJustID(ctx, &redis.XAutoClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumer2, + // MinIdle: 10 * time.Millisecond, + // Start: "-", + // }) + // require.NoError(t, rsp.Err()) + // msgs, start := rsp.Val() + // require.Equal(t, "0-0", start) + // require.Len(t, msgs, 2) + // require.Equal(t, id1, msgs[0]) + // require.Equal(t, id3, msgs[1]) + // } + // }) + + // t.Run("XAUTOCLAIM as an iterator", func(t *testing.T) { + // streamName := "mystream" + // groupName := "mygroup" + // var id3, id5 string + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // { + // rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"a", "1"}, + // }) + // require.NoError(t, rsp.Err()) + // } + // { + // rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"b", "2"}, + // }) + // require.NoError(t, rsp.Err()) + // } + // { + // rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"c", "3"}, + // }) + // require.NoError(t, rsp.Err()) + // id3 = rsp.Val() + // } + // { + // rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"d", "4"}, + // }) + // require.NoError(t, rsp.Err()) + // } + // { + // rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"e", "5"}, + // }) + // require.NoError(t, rsp.Err()) + // id5 = rsp.Val() + // } + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + + // consumer1, consumer2 := "consumer1", "consumer2" + // { + // rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: consumer1, + // Streams: []string{streamName, ">"}, + // Count: 90, + // }) + // require.NoError(t, rsp.Err()) + // time.Sleep(200 * time.Millisecond) + // } + // { + // rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumer2, + // MinIdle: 10 * time.Millisecond, + // Count: 2, + // Start: "-", + // }) + // require.NoError(t, rsp.Err()) + // msgs, start := rsp.Val() + // require.Equal(t, id3, start) + // require.Len(t, msgs, 2) + // require.Len(t, msgs[0].Values, 1) + // require.Equal(t, "1", msgs[0].Values["a"]) + // } + + // { + // rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumer2, + // MinIdle: 10 * time.Millisecond, + // Start: id3, + // Count: 2, + // }) + // require.NoError(t, rsp.Err()) + // msgs, start := rsp.Val() + // require.Equal(t, id5, start) + // require.Len(t, msgs, 2) + // require.Len(t, msgs[0].Values, 1) + // require.Equal(t, "3", msgs[0].Values["c"]) + // } + + // { + // rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + // Stream: streamName, + // Group: groupName, + // Consumer: consumer2, + // MinIdle: 10 * time.Millisecond, + // Start: id5, + // Count: 1, + // }) + // require.NoError(t, rsp.Err()) + // msgs, start := rsp.Val() + // require.Equal(t, "0-0", start) + // require.Len(t, msgs, 1) + // require.Len(t, msgs[0].Values, 1) + // require.Equal(t, "5", msgs[0].Values["e"]) + // } + // }) + + // t.Run("XAUTOCLAIM with XDEL", func(t *testing.T) { + // streamName := "x" + // groupName := "grp" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: map[string]interface{}{"f": "v"}, + // }).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "2-0", + // Values: map[string]interface{}{"f": "v"}, + // }).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "3-0", + // Values: map[string]interface{}{"f": "v"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + // { + // rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: "Alice", + // Streams: []string{streamName, ">"}, + // }) + // require.NoError(t, rsp.Err()) + // require.Len(t, rsp.Val(), 1) + // require.Len(t, rsp.Val()[0].Messages, 3) + // require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID) + // require.Equal(t, "v", rsp.Val()[0].Messages[0].Values["f"]) + // require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID) + // require.Equal(t, "v", rsp.Val()[0].Messages[1].Values["f"]) + // require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID) + // require.Equal(t, "v", rsp.Val()[0].Messages[2].Values["f"]) + // } + // { + // require.NoError(t, rdb.XDel(ctx, streamName, "2-0").Err()) + // cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "0-0") + // require.NoError(t, cmd.Err()) + // require.Equal(t, []interface{}{ + // "0-0", + // []interface{}{ + // []interface{}{ + // "1-0", + // []interface{}{"f", "v"}, + // }, + // []interface{}{ + // "3-0", + // []interface{}{"f", "v"}, + // }, + // }, + // []interface{}{ + // "2-0", + // }, + // }, cmd.Val()) + // } + // }) + + // t.Run("XAUTOCLAIM with XDEL and count", func(t *testing.T) { + // streamName := "x" + // groupName := "grp" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: map[string]interface{}{"f": "v"}, + // }).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "2-0", + // Values: map[string]interface{}{"f": "v"}, + // }).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "3-0", + // Values: map[string]interface{}{"f": "v"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + // { + // rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + // Group: groupName, + // Consumer: "Alice", + // Streams: []string{streamName, ">"}, + // }) + // require.NoError(t, rsp.Err()) + // require.Len(t, rsp.Val(), 1) + // require.Len(t, rsp.Val()[0].Messages, 3) + // require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID) + // require.Equal(t, "v", rsp.Val()[0].Messages[0].Values["f"]) + // require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID) + // require.Equal(t, "v", rsp.Val()[0].Messages[1].Values["f"]) + // require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID) + // require.Equal(t, "v", rsp.Val()[0].Messages[2].Values["f"]) + // } + // { + // require.NoError(t, rdb.XDel(ctx, streamName, "1-0").Err()) + // require.NoError(t, rdb.XDel(ctx, streamName, "2-0").Err()) + // cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "0-0", "COUNT", 1) + // require.NoError(t, cmd.Err()) + // require.Equal(t, []interface{}{ + // "2-0", + // []interface{}{}, + // []interface{}{ + // "1-0", + // }, + // }, cmd.Val()) + // } + // { + // cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "2-0", "COUNT", 1) + // require.NoError(t, cmd.Err()) + // require.Equal(t, []interface{}{ + // "3-0", + // []interface{}{}, + // []interface{}{ + // "2-0", + // }, + // }, cmd.Val()) + // } + // { + // cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "3-0", "COUNT", 1) + // require.NoError(t, cmd.Err()) + // require.Equal(t, []interface{}{ + // "0-0", + // []interface{}{ + // []interface{}{ + // "3-0", + // []interface{}{"f", "v"}, + // }, + // }, + // []interface{}{}, + // }, cmd.Val()) + // } + // // assert_equal [XPENDING x grp - + 10 Alice] {} + // // add xpending to this test case when it is supported + // }) + + // t.Run("XAUTOCLAIM with out of range count", func(t *testing.T) { + // err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + // Stream: "x", + // Group: "grp", + // Consumer: "Bob", + // MinIdle: 0, + // Start: "3-0", + // Count: 8070450532247928833, + // }).Err() + // require.Error(t, err) + // require.True(t, strings.HasPrefix(err.Error(), "ERR COUNT")) + // }) + + // t.Run("XAUTOCLAIM COUNT must be > 0", func(t *testing.T) { + // cmd := rdb.Do(ctx, "XAUTOCLAIM", "key", "group", "consumer", 1, 1, "COUNT", 0) + // require.Error(t, cmd.Err()) + // require.Equal(t, "ERR COUNT must be > 0", cmd.Err().Error()) + // }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {