diff --git a/test/stream_test.go b/test/stream_test.go index c2146b8..aac01e0 100644 --- a/test/stream_test.go +++ b/test/stream_test.go @@ -47,8 +47,7 @@ func TestRedis(t *testing.T) { Password: "", // No password set DB: 0, // Use default DB }) - gid := ksuid.New().String() - _, err := rc.XGroupCreateMkStream(context.Background(), "kawa/topic", gid, "$").Result() + _, err := rc.XGroupCreateMkStream(context.Background(), "kawa/topic", "kawa", "$").Result() if err != nil { fmt.Println("err initing", err) } diff --git a/x/redis/redis.go b/x/redis/redis.go index aa6e391..4614614 100644 --- a/x/redis/redis.go +++ b/x/redis/redis.go @@ -6,6 +6,7 @@ import ( goredis "github.com/redis/go-redis/v9" "github.com/runreveal/kawa" + "github.com/segmentio/ksuid" ) type Options struct { @@ -98,11 +99,13 @@ func (s *Source) Run(ctx context.Context) error { func (s *Source) recvLoop(ctx context.Context) error { var err error + gid := ksuid.New().String() + outer: for { msgs, err := s.client.XReadGroup(ctx, &goredis.XReadGroupArgs{ Group: "kawa", - Consumer: "1", + Consumer: gid, Streams: []string{s.opts.topic, ">"}, Count: 100, Block: 0,