diff --git a/pubsub/redis/redis.go b/pubsub/redis/redis.go index 81fa61d5f7..ad057d42f4 100644 --- a/pubsub/redis/redis.go +++ b/pubsub/redis/redis.go @@ -109,8 +109,8 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest) return nil } -func (r *redisStreams) CreateConsumerGroup(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { - err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.clientSettings.ConsumerID, "0") +func (r *redisStreams) CreateConsumerGroup(ctx context.Context, stream string) error { + err := r.client.XGroupCreateMkStream(ctx, stream, r.clientSettings.ConsumerID, "0") // Ignore BUSYGROUP errors if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { r.logger.Errorf("redis streams: %s", err) @@ -124,7 +124,7 @@ func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeReques return errors.New("component is closed") } - if err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.clientSettings.ConsumerID, "0"); err != nil { + if err := r.CreateConsumerGroup(ctx, req.Topic); err != nil { return err } @@ -253,7 +253,7 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h if strings.Contains(err.Error(), "NOGROUP") { r.logger.Warnf("redis streams: consumer group %s does not exist for stream %s. This could mean the server experienced data loss, or the group/stream was deleted.", r.clientSettings.ConsumerID, stream) r.logger.Warnf("redis streams: recreating group %s for stream %s", r.clientSettings.ConsumerID, stream) - r.client.XGroupCreateMkStream(ctx, stream, r.clientSettings.ConsumerID, "0") + r.CreateConsumerGroup(ctx, stream) } r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err) }