From bd14efa3fb6899da8814dc639bd36db4ec89d1f6 Mon Sep 17 00:00:00 2001 From: Alan Braithwaite Date: Tue, 9 Jul 2024 17:17:25 -0500 Subject: [PATCH] fix redis test --- test/stream_test.go | 3 +-- x/redis/redis.go | 5 ++++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/test/stream_test.go b/test/stream_test.go index 6dfaebb..f3615fe 100644 --- a/test/stream_test.go +++ b/test/stream_test.go @@ -32,8 +32,7 @@ func TestRedis(t *testing.T) { Password: "", // No password set DB: 0, // Use default DB }) - gid := ksuid.New().String() - _, err := rc.XGroupCreateMkStream(context.Background(), "kawa/topic", gid, "$").Result() + _, err := rc.XGroupCreateMkStream(context.Background(), "kawa/topic", "kawa", "$").Result() if err != nil { fmt.Println("err initing", err) } diff --git a/x/redis/redis.go b/x/redis/redis.go index aa6e391..4614614 100644 --- a/x/redis/redis.go +++ b/x/redis/redis.go @@ -6,6 +6,7 @@ import ( goredis "github.com/redis/go-redis/v9" "github.com/runreveal/kawa" + "github.com/segmentio/ksuid" ) type Options struct { @@ -98,11 +99,13 @@ func (s *Source) Run(ctx context.Context) error { func (s *Source) recvLoop(ctx context.Context) error { var err error + gid := ksuid.New().String() + outer: for { msgs, err := s.client.XReadGroup(ctx, &goredis.XReadGroupArgs{ Group: "kawa", - Consumer: "1", + Consumer: gid, Streams: []string{s.opts.topic, ">"}, Count: 100, Block: 0,