Skip to content

Commit

Permalink
Merge pull request #52 from ryotarai/faster-delete-queue
Browse files Browse the repository at this point in the history
Reduce keys to watch in DeleteQueue.
  • Loading branch information
shioshiota authored Nov 15, 2023
2 parents 25b1b72 + b28c589 commit 87e2795
Showing 1 changed file with 4 additions and 6 deletions.
10 changes: 4 additions & 6 deletions pkg/backend/redis/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,10 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error {
if err != nil {
return err
}
// WATCH {all_queues_key} {queue_key}
// WATCH workers_key
// WATCH tasks_key
// .. worker_keys = collect worker keys
// WATCH worker_keys
// .. task_keys = collect task keys
// WATCh task_keys
// MULTI
// DEL {queue_key} worker_keys task_keys
// HDEL {all_queues_key} {queueName}
Expand All @@ -230,14 +229,12 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error {
if err != nil {
return err
}
tx.Watch(workerKeysToDelete...)
keysToDelete = append(keysToDelete, workerKeysToDelete...)

taskKeysToDelete, err := b.allTasksKeysForDeleteQueue(tx, queue.UID.String())
if err != nil {
return err
}
tx.Watch(taskKeysToDelete...)
keysToDelete = append(keysToDelete, taskKeysToDelete...)

_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
Expand All @@ -255,7 +252,8 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error {
Str("operation", "DeleteQueue").
Logger(),
txf,
b.allQueuesKey(), b.queueKey(queue.UID.String()),
b.workersKey(queue.UID.String()),
b.tasksKey(queue.UID.String()),
)
}

Expand Down

0 comments on commit 87e2795

Please sign in to comment.