diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index 44ea134..db073b5 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -212,11 +212,10 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { if err != nil { return err } - // WATCH {all_queues_key} {queue_key} + // WATCH workers_key + // WATCH tasks_key // .. worker_keys = collect worker keys - // WATCH worker_keys // .. task_keys = collect task keys - // WATCh task_keys // MULTI // DEL {queue_key} worker_keys task_keys // HDEL {all_queues_key} {queueName} @@ -230,14 +229,12 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { if err != nil { return err } - tx.Watch(workerKeysToDelete...) keysToDelete = append(keysToDelete, workerKeysToDelete...) taskKeysToDelete, err := b.allTasksKeysForDeleteQueue(tx, queue.UID.String()) if err != nil { return err } - tx.Watch(taskKeysToDelete...) keysToDelete = append(keysToDelete, taskKeysToDelete...) _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { @@ -255,7 +252,8 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { Str("operation", "DeleteQueue"). Logger(), txf, - b.allQueuesKey(), b.queueKey(queue.UID.String()), + b.workersKey(queue.UID.String()), + b.tasksKey(queue.UID.String()), ) }