Skip to content

Commit

Permalink
add digest scores for faster deletes in sorted sets
Browse files Browse the repository at this point in the history
  • Loading branch information
ezekg committed Feb 14, 2024
1 parent 779f297 commit 60e380f
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 38 deletions.
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/batch_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class BatchDelete
PRIMED
LOCKED
INFO
SCORE
].freeze

# includes "SidekiqUniqueJobs::Connection"
Expand Down
7 changes: 6 additions & 1 deletion lib/sidekiq_unique_jobs/key.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class Key
# @!attribute [r] expiring_digests
# @return [String] the zset with locked expiring_digests
attr_reader :expiring_digests
#
# @!attribute [r] score
# @return [String] score (timestamp) for the lock
attr_reader :score

#
# Initialize a new Key
Expand All @@ -49,6 +53,7 @@ def initialize(digest)
@primed = suffixed_key("PRIMED")
@locked = suffixed_key("LOCKED")
@info = suffixed_key("INFO")
@score = suffixed_key("SCORE")
@changelog = CHANGELOGS
@digests = DIGESTS
@expiring_digests = EXPIRING_DIGESTS
Expand Down Expand Up @@ -86,7 +91,7 @@ def ==(other)
# @return [Array] an ordered array with all keys
#
def to_a
[digest, queued, primed, locked, info, changelog, digests, expiring_digests]
[digest, queued, primed, locked, info, changelog, digests, expiring_digests, score]
end

private
Expand Down
17 changes: 16 additions & 1 deletion lib/sidekiq_unique_jobs/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def lock(job_id, lock_info = {}, score = nil)
add_digest_to_set(pipeline, lock_info, score)
pipeline.zadd(key.changelog, score, changelog_json(job_id, "queue.lua", "Queued"))
pipeline.zadd(key.changelog, score, changelog_json(job_id, "lock.lua", "Locked"))
pipeline.set(key.score, now_f)
end
end
end
Expand Down Expand Up @@ -129,7 +130,7 @@ def del
redis do |conn|
conn.multi do |pipeline|
pipeline.zrem(DIGESTS, key.digest)
pipeline.del(key.digest, key.queued, key.primed, key.locked, key.info)
pipeline.del(key.digest, key.queued, key.primed, key.locked, key.info, key.score)
end
end
end
Expand Down Expand Up @@ -264,6 +265,19 @@ def changelog
@changelog ||= Changelog.new
end

#
# The score for the lock
#
# @note Used to improve performance when deleting digests from
# sorted sets, e.g. the schedule and retry sets.
#
#
# @return [Redis::String] a string representation of the key
#
def score
@score ||= Redis::String.new(key.score)
end

#
# A nicely formatted string with information about this lock
#
Expand All @@ -280,6 +294,7 @@ def to_s
primed_jids: #{primed_jids}
locked_jids: #{locked_jids}
changelogs: #{changelogs}
score: #{score.value}
MESSAGE
end

Expand Down
37 changes: 37 additions & 0 deletions lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ def enqueue(conn)
return unless validity >= 0 || config.pttl.zero?

write_lock_info(conn)
write_lock_score(conn)

yield job_id
end

Expand Down Expand Up @@ -331,6 +333,20 @@ def write_lock_info(conn)
conn.set(key.info, lock_info)
end

#
# Writes lock score to redis.
# The lock score is used to improve performance when iterating large sorted
# sets e.g the schedule and retry sets.
#
#
# @return [void]
#
def write_lock_score(conn)
return unless lock_score?

conn.set(key.score, lock_score)
end

#
# Used to combat redis imprecision with ttl/pttl
#
Expand Down Expand Up @@ -378,6 +394,27 @@ def lock_info
)
end

#
# Reads lock score from redis.
# The lock score is used to improve performance when iterating large sorted
# sets e.g the schedule and retry sets.
#
#
# @return [Float]
#
def lock_score
@lock_score ||= item[AT]
end

#
# Checks if a lock score exists.
#
# @return [true, false]
#
def lock_score?
!lock_score.nil?
end

def redis_version
@redis_version ||= SidekiqUniqueJobs.config.redis_version
end
Expand Down
20 changes: 11 additions & 9 deletions lib/sidekiq_unique_jobs/lua/delete.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
-------- BEGIN keys ---------
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local expiring_digests = KEYS[8]
local score = KEYS[9]
-------- END keys ---------

-------- BEGIN lock arguments ---------
Expand Down Expand Up @@ -37,8 +39,8 @@ local count = 0
log_debug("ZREM", digests, digest)
count = count + redis.call("ZREM", digests, digest)

log_debug("UNLINK", digest, queued, primed, locked, info)
count = count + redis.call("UNLINK", digest, queued, primed, locked, info)
log_debug("UNLINK", digest, queued, primed, locked, info, score)
count = count + redis.call("UNLINK", digest, queued, primed, locked, info, score)


log("Deleted (" .. count .. ") keys")
Expand Down
4 changes: 4 additions & 0 deletions lib/sidekiq_unique_jobs/lua/lock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local expiring_digests = KEYS[8]
local score = KEYS[9]
-------- END keys ---------


Expand Down Expand Up @@ -85,6 +86,9 @@ if pttl and pttl > 0 then

log_debug("PEXPIRE", info, pttl)
redis.call("PEXPIRE", info, pttl)

log_debug("PEXPIRE", score, pttl)
redis.call("PEXPIRE", score, pttl)
end

log_debug("PEXPIRE", queued, 1000)
Expand Down
16 changes: 9 additions & 7 deletions lib/sidekiq_unique_jobs/lua/locked.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
-------- BEGIN keys ---------
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local expiring_digests = KEYS[8]
local score = KEYS[9]
-------- END keys ---------

-------- BEGIN lock arguments ---------
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/lua/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local score = KEYS[8]
-------- END keys ---------


Expand Down
6 changes: 4 additions & 2 deletions lib/sidekiq_unique_jobs/lua/reap_orphans.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ repeat
local run_primed = digest .. ":RUN:PRIMED"
local run_locked = digest .. ":RUN:LOCKED"
local run_info = digest .. ":RUN:INFO"
local score = digest .. ":SCORE"

redis.call("UNLINK", digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info)
redis.call("UNLINK", digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info, score)

redis.call("ZREM", digests_set, digest)
del_count = del_count + 1
Expand All @@ -104,8 +105,9 @@ if del_count < reaper_count then
local run_primed = digest .. ":RUN:PRIMED"
local run_locked = digest .. ":RUN:LOCKED"
local run_info = digest .. ":RUN:INFO"
local score = digest .. ":SCORE"

redis.call("UNLINK", digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info)
redis.call("UNLINK", digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info, score)

redis.call("ZREM", expiring_digests_set, digest)
del_count = del_count + 1
Expand Down
22 changes: 16 additions & 6 deletions lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
local function delete_from_sorted_set(name, digest)
local per = 50
local total = redis.call("zcard", name)
local index = 0
local offset = 0
local per = 50
local total = redis.call("zcard", name)
local score = redis.call("get", digest .. ":SCORE")
local result

while (index < total) do
local items = redis.call("ZRANGE", name, index, index + per -1)
while (offset < total) do
local items

if score then
items = redis.call("ZRANGE", name, score, "+inf", "BYSCORE", "LIMIT", offset, per)
else
items = redis.call("ZRANGE", name, offset, offset + per -1)
end

for _, item in pairs(items) do
if string.find(item, digest) then
redis.call("ZREM", name, item)
result = item
break
end
end
index = index + per

offset = offset + per
end

return result
end
20 changes: 11 additions & 9 deletions lib/sidekiq_unique_jobs/lua/unlock.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
-------- BEGIN keys ---------
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local digest = KEYS[1]
local queued = KEYS[2]
local primed = KEYS[3]
local locked = KEYS[4]
local info = KEYS[5]
local changelog = KEYS[6]
local digests = KEYS[7]
local expiring_digests = KEYS[8]
local score = KEYS[9]
-------- END keys ---------


Expand Down Expand Up @@ -68,8 +70,8 @@ redis.call("LREM", primed, -1, job_id)
local redis_version = toversion(redisversion)

if lock_type ~= "until_expired" then
log_debug("UNLINK", digest, info)
redis.call("UNLINK", digest, info)
log_debug("UNLINK", digest, info, score)
redis.call("UNLINK", digest, info, score)

log_debug("HDEL", locked, job_id)
redis.call("HDEL", locked, job_id)
Expand Down
1 change: 1 addition & 0 deletions spec/sidekiq_unique_jobs/key_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
uniquejobs:changelog
uniquejobs:digests
uniquejobs:expiring_digests
#{digest_one}:SCORE
],
)
end
Expand Down
7 changes: 4 additions & 3 deletions spec/sidekiq_unique_jobs/lock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
primed_jids: []
locked_jids: []
changelogs: []
score:\s
MESSAGE
end

Expand Down Expand Up @@ -45,7 +46,7 @@

it "creates all expected keys in redis" do
create
expect(keys).to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests)
expect(keys).to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests, key.score)
expect(create.locked_jids).to include(job_id)
end
end
Expand All @@ -70,7 +71,7 @@
it "creates keys and adds job_id to locked hash" do
expect { lock }.to change { entity.locked_jids }.to([job_id])

expect(keys).to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests)
expect(keys).to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests, key.score)
end
end

Expand All @@ -82,7 +83,7 @@
it "creates keys and adds job_id to locked hash" do
expect { lock }.to change { entity.locked_jids }.to([job_id])
del
expect(keys).not_to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests)
expect(keys).not_to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests, key.score)
end
end

Expand Down

0 comments on commit 60e380f

Please sign in to comment.