From 27ec05e74816e4111b00ac89f461d062ad884c80 Mon Sep 17 00:00:00 2001 From: Grey Moore Date: Sun, 17 Nov 2024 14:27:53 -0500 Subject: [PATCH] feat: Allow configuring how throttled jobs are pushed back (#150) This adds support for setting the `requeue_strategy` for a job, to specify what we should do with throttled jobs. The default is `:enqueue`, which is the current behavior: re-add it to the end of the queue. The other option is `:schedule`, which schedules the job for a time in the future when we think we'll have capacity to process it. It's also possible to set the `default_requeue_strategy` in the configuration, to set this behavior for all jobs that do not individually specify a `requeue_strategy`. This may be relevant to Issue #36. Unrelatedly, there's a commit in here that changes the format of some `require_relative` lines, to comply with the new Rubocop rule `Style/RedundantCurrentDirectoryInPath`. I don't feel strongly about this commit; I only added it so that rubocop would pass. --------- Signed-off-by: Diego Marcet Co-authored-by: anero Co-authored-by: Diego Marcet Co-authored-by: Alexey Zapparov Co-authored-by: Mauricio Novelo <5916364+mnovelo@users.noreply.github.com> --- lib/sidekiq/throttled.rb | 18 + lib/sidekiq/throttled/config.rb | 26 +- lib/sidekiq/throttled/job.rb | 28 +- lib/sidekiq/throttled/patches/basic_fetch.rb | 11 - lib/sidekiq/throttled/patches/super_fetch.rb | 13 - .../throttled/patches/throttled_retriever.rb | 2 +- lib/sidekiq/throttled/strategy.rb | 101 ++- lib/sidekiq/throttled/strategy/concurrency.rb | 10 + lib/sidekiq/throttled/strategy/threshold.rb | 19 + lib/sidekiq/throttled/strategy_collection.rb | 5 + spec/lib/sidekiq/throttled/job_spec.rb | 90 ++- .../throttled/patches/basic_fetch_spec.rb | 4 +- .../throttled/patches/super_fetch_spec.rb | 4 +- .../throttled/strategy/concurrency_spec.rb | 56 ++ .../throttled/strategy/threshold_spec.rb | 56 ++ spec/lib/sidekiq/throttled/strategy_spec.rb | 736 ++++++++++++++++++ spec/lib/sidekiq/throttled_spec.rb | 33 + spec/support/sidekiq.rb | 5 +- 18 files changed, 1176 insertions(+), 41 deletions(-) diff --git a/lib/sidekiq/throttled.rb b/lib/sidekiq/throttled.rb index 6cefcdb8..67c9016e 100644 --- a/lib/sidekiq/throttled.rb +++ b/lib/sidekiq/throttled.rb @@ -54,6 +54,11 @@ class << self # @return [Cooldown, nil] attr_reader :cooldown + # @api internal + # + # @return [Config, nil] + attr_reader :config + # @example # Sidekiq::Throttled.configure do |config| # config.cooldown_period = nil # Disable queues cooldown manager @@ -87,6 +92,19 @@ def throttled?(message) rescue StandardError false end + + # Return throttled job to be executed later, delegating the details of how to do that + # to the Strategy for that job. + # + # @return [void] + def requeue_throttled(work) + message = JSON.parse(work.job) + job_class = Object.const_get(message.fetch("wrapped") { message.fetch("class") { return false } }) + + Registry.get job_class do |strategy| + strategy.requeue_throttled(work, **job_class.sidekiq_throttled_requeue_options) + end + end end end diff --git a/lib/sidekiq/throttled/config.rb b/lib/sidekiq/throttled/config.rb index 2eadcfea..f21b0a90 100644 --- a/lib/sidekiq/throttled/config.rb +++ b/lib/sidekiq/throttled/config.rb @@ -19,9 +19,18 @@ class Config # @return [Integer] attr_reader :cooldown_threshold + # Specifies how we should return throttled jobs to the queue so they can be executed later. + # Expects a hash with keys that may include :with and :to + # For :with, options are `:enqueue` (put them on the end of the queue) and `:schedule` (schedule for later). + # For :to, the name of a sidekiq queue should be specified. If none is specified, jobs will by default be + # requeued to the same queue they were originally enqueued in. + # Default: {with: `:enqueue`} + # + # @return [Hash] + attr_reader :default_requeue_options + def initialize - @cooldown_period = 1.0 - @cooldown_threshold = 100 + reset! end # @!attribute [w] cooldown_period @@ -39,6 +48,19 @@ def cooldown_threshold=(value) @cooldown_threshold = value end + + # @!attribute [w] default_requeue_options + def default_requeue_options=(options) + requeue_with = options.delete(:with).intern || :enqueue + + @default_requeue_options = options.merge({ with: requeue_with }) + end + + def reset! + @cooldown_period = 1.0 + @cooldown_threshold = 100 + @default_requeue_options = { with: :enqueue } + end end end end diff --git a/lib/sidekiq/throttled/job.rb b/lib/sidekiq/throttled/job.rb index dca50d72..57fcf9e1 100644 --- a/lib/sidekiq/throttled/job.rb +++ b/lib/sidekiq/throttled/job.rb @@ -13,8 +13,9 @@ module Throttled # include Sidekiq::Job # include Sidekiq::Throttled::Job # - # sidekiq_options :queue => :my_queue - # sidekiq_throttle :threshold => { :limit => 123, :period => 1.hour } + # sidkiq_options :queue => :my_queue + # sidekiq_throttle :threshold => { :limit => 123, :period => 1.hour }, + # :requeue => { :to => :other_queue, :with => :schedule } # # def perform # # ... @@ -23,6 +24,8 @@ module Throttled # # @see ClassMethods module Job + VALID_VALUES_FOR_REQUEUE_WITH = %i[enqueue schedule].freeze + # Extends worker class with {ClassMethods}. # # @note Using `included` hook with extending worker with {ClassMethods} @@ -30,6 +33,7 @@ module Job # # @private def self.included(base) + base.sidekiq_class_attribute :sidekiq_throttled_requeue_options base.extend(ClassMethods) end @@ -71,9 +75,29 @@ module ClassMethods # }) # end # + # @example Allow max 123 MyJob jobs per hour; when jobs are throttled, schedule them for later in :other_queue + # + # class MyJob + # include Sidekiq::Job + # include Sidekiq::Throttled::Job + # + # sidekiq_throttle({ + # :threshold => { :limit => 123, :period => 1.hour }, + # :requeue => { :to => :other_queue, :with => :schedule } + # }) + # end + # + # @param [Hash] requeue What to do with jobs that are throttled # @see Registry.add # @return [void] def sidekiq_throttle(**kwargs) + requeue_options = Throttled.config.default_requeue_options.merge(kwargs.delete(:requeue) || {}) + unless VALID_VALUES_FOR_REQUEUE_WITH.include?(requeue_options[:with]) + raise ArgumentError, "requeue: #{requeue_options[:with]} is not a valid value for :with" + end + + self.sidekiq_throttled_requeue_options = requeue_options + Registry.add(self, **kwargs) end diff --git a/lib/sidekiq/throttled/patches/basic_fetch.rb b/lib/sidekiq/throttled/patches/basic_fetch.rb index 83d079be..0cc904b3 100644 --- a/lib/sidekiq/throttled/patches/basic_fetch.rb +++ b/lib/sidekiq/throttled/patches/basic_fetch.rb @@ -15,17 +15,6 @@ def self.prepended(base) private - # Pushes job back to the head of the queue, so that job won't be tried - # immediately after it was requeued (in most cases). - # - # @note This is triggered when job is throttled. So it is same operation - # Sidekiq performs upon `Sidekiq::Worker.perform_async` call. - # - # @return [void] - def requeue_throttled(work) - redis { |conn| conn.lpush(work.queue, work.job) } - end - # Returns list of queues to try to fetch jobs from. # # @note It may return an empty array. diff --git a/lib/sidekiq/throttled/patches/super_fetch.rb b/lib/sidekiq/throttled/patches/super_fetch.rb index c1099cd8..d1d33a5f 100644 --- a/lib/sidekiq/throttled/patches/super_fetch.rb +++ b/lib/sidekiq/throttled/patches/super_fetch.rb @@ -14,19 +14,6 @@ def self.prepended(base) private - # Calls SuperFetch UnitOfWork's requeue to remove the job from the - # temporary queue and push job back to the head of the queue, so that - # the job won't be tried immediately after it was requeued (in most cases). - # - # @note This is triggered when job is throttled. - # - # @return [void] - def requeue_throttled(work) - # SuperFetch UnitOfWork's requeue will remove it from the temporary - # queue and then requeue it, so no acknowledgement call is needed. - work.requeue - end - # Returns list of non-paused queues to try to fetch jobs from. # # @note It may return an empty array. diff --git a/lib/sidekiq/throttled/patches/throttled_retriever.rb b/lib/sidekiq/throttled/patches/throttled_retriever.rb index 353c58fa..87dd3240 100644 --- a/lib/sidekiq/throttled/patches/throttled_retriever.rb +++ b/lib/sidekiq/throttled/patches/throttled_retriever.rb @@ -12,7 +12,7 @@ def retrieve_work if work && Throttled.throttled?(work.job) Throttled.cooldown&.notify_throttled(work.queue) - requeue_throttled(work) + Throttled.requeue_throttled(work) return nil end diff --git a/lib/sidekiq/throttled/strategy.rb b/lib/sidekiq/throttled/strategy.rb index 6a1b1dea..a3681a39 100644 --- a/lib/sidekiq/throttled/strategy.rb +++ b/lib/sidekiq/throttled/strategy.rb @@ -31,7 +31,7 @@ class Strategy # See keyword args of {Strategy::Threshold#initialize} for details. # @param [#call] key_suffix Dynamic key suffix generator. # @param [#call] observer Process called after throttled. - def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) # rubocop:disable Metrics/MethodLength + def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) @observer = observer @concurrency = StrategyCollection.new(concurrency, @@ -44,9 +44,7 @@ def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer name: name, key_suffix: key_suffix) - return if @concurrency.any? || @threshold.any? - - raise ArgumentError, "Neither :concurrency nor :threshold given" + raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any? end # @return [Boolean] whenever strategy has dynamic config @@ -74,18 +72,111 @@ def throttled?(jid, *job_args) false end + # Return throttled job to be executed later. Implementation depends on the value of `with`: + # :enqueue means put the job back at the end of the queue immediately + # :schedule means schedule enqueueing the job for a later time when we expect to have capacity + # + # @param [#to_s, #call] with How to handle the throttled job + # @param [#to_s, #call] to Name of the queue to re-queue the job to. + # If not specified, will use the job's original queue. + # @return [void] + def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength + # Resolve :with and :to arguments, calling them if they are Procs + job_args = JSON.parse(work.job)["args"] + requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with + target_queue = calc_target_queue(work, to) + + case requeue_with + when :enqueue + re_enqueue_throttled(work, target_queue) + when :schedule + # Find out when we will next be able to execute this job, and reschedule for then. + reschedule_throttled(work, requeue_to: target_queue) + else + raise "unrecognized :with option #{with}" + end + end + # Marks job as being processed. # @return [void] def finalize!(jid, *job_args) @concurrency&.finalize!(jid, *job_args) end - # Resets count of jobs of all avaliable strategies + # Resets count of jobs of all available strategies # @return [void] def reset! @concurrency&.reset! @threshold&.reset! end + + private + + def calc_target_queue(work, to) # rubocop:disable Metrics/MethodLength + target = case to + when Proc, Method + to.call(*JSON.parse(work.job)["args"]) + when NilClass + work.queue + when String, Symbol + to.to_s + else + raise ArgumentError, "Invalid argument for `to`" + end + + target = work.queue if target.nil? || target.empty? + + target.start_with?("queue:") ? target : "queue:#{target}" + end + + # Push the job back to the head of the queue. + def re_enqueue_throttled(work, requeue_to) + case work.class.name + when "Sidekiq::Pro::SuperFetch::UnitOfWork" + # Calls SuperFetch UnitOfWork's requeue to remove the job from the + # temporary queue and push job back to the head of the target queue, so that + # the job won't be tried immediately after it was requeued (in most cases). + work.queue = requeue_to if requeue_to + work.requeue + else + # This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call. + Sidekiq.redis { |conn| conn.lpush(requeue_to, work.job) } + end + end + + def reschedule_throttled(work, requeue_to:) + message = JSON.parse(work.job) + job_class = message.fetch("wrapped") { message.fetch("class") { return false } } + job_args = message["args"] + + # Re-enqueue the job to the target queue at another time as a NEW unit of work + # AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned + # Technically, the job could processed twice if the process dies between the two lines, + # but your job should be idempotent anyway, right? + # The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk. + Sidekiq::Client.enqueue_to_in(requeue_to, retry_in(work), Object.const_get(job_class), *job_args) + work.acknowledge + end + + def retry_in(work) + message = JSON.parse(work.job) + jid = message.fetch("jid") { return false } + job_args = message["args"] + + # Ask both concurrency and threshold, if relevant, how long minimum until we can retry. + # If we get two answers, take the longer one. + intervals = [@concurrency&.retry_in(jid, *job_args), @threshold&.retry_in(*job_args)].compact + + raise "Cannot compute a valid retry interval" if intervals.empty? + + interval = intervals.max + + # Add a random amount of jitter, proportional to the length of the minimum retry time. + # This helps spread out jobs more evenly and avoid clumps of jobs on the queue. + interval += rand(interval / 5) if interval > 10 + + interval + end end end end diff --git a/lib/sidekiq/throttled/strategy/concurrency.rb b/lib/sidekiq/throttled/strategy/concurrency.rb index 9d9663c7..2f4e9376 100644 --- a/lib/sidekiq/throttled/strategy/concurrency.rb +++ b/lib/sidekiq/throttled/strategy/concurrency.rb @@ -52,6 +52,16 @@ def throttled?(jid, *job_args) Sidekiq.redis { |redis| 1 == SCRIPT.call(redis, keys: keys, argv: argv) } end + # @return [Float] How long, in seconds, before we'll next be able to take on jobs + def retry_in(_jid, *job_args) + job_limit = limit(job_args) + return 0.0 if !job_limit || count(*job_args) < job_limit + + oldest_jid_with_score = Sidekiq.redis { |redis| redis.zrange(key(job_args), 0, 0, withscores: true) }.first + expiry_time = oldest_jid_with_score.last.to_f + expiry_time - Time.now.to_f + end + # @return [Integer] Current count of jobs def count(*job_args) Sidekiq.redis { |conn| conn.zcard(key(job_args)) }.to_i diff --git a/lib/sidekiq/throttled/strategy/threshold.rb b/lib/sidekiq/throttled/strategy/threshold.rb index 11383d36..095f37e0 100644 --- a/lib/sidekiq/throttled/strategy/threshold.rb +++ b/lib/sidekiq/throttled/strategy/threshold.rb @@ -69,6 +69,25 @@ def throttled?(*job_args) Sidekiq.redis { |redis| 1 == SCRIPT.call(redis, keys: keys, argv: argv) } end + # @return [Float] How long, in seconds, before we'll next be able to take on jobs + def retry_in(*job_args) + job_limit = limit(job_args) + return 0.0 if !job_limit || count(*job_args) < job_limit + + job_period = period(job_args) + job_key = key(job_args) + time_since_oldest = Time.now.to_f - Sidekiq.redis { |redis| redis.lindex(job_key, -1) }.to_f + if time_since_oldest > job_period + # The oldest job on our list is from more than the throttling period ago, + # which means we have not hit the limit this period. + 0.0 + else + # If we can only have X jobs every Y minutes, then wait until Y minutes have elapsed + # since the oldest job on our list. + job_period - time_since_oldest + end + end + # @return [Integer] Current count of jobs def count(*job_args) Sidekiq.redis { |conn| conn.llen(key(job_args)) }.to_i diff --git a/lib/sidekiq/throttled/strategy_collection.rb b/lib/sidekiq/throttled/strategy_collection.rb index 71a8afa7..d6fd0bc4 100644 --- a/lib/sidekiq/throttled/strategy_collection.rb +++ b/lib/sidekiq/throttled/strategy_collection.rb @@ -41,6 +41,11 @@ def throttled?(...) any? { |s| s.throttled?(...) } end + # @return [Float] How long, in seconds, before we'll next be able to take on jobs + def retry_in(*args) + max { |s| s.retry_in(*args) } + end + # Marks job as being processed. # @return [void] def finalize!(...) diff --git a/spec/lib/sidekiq/throttled/job_spec.rb b/spec/lib/sidekiq/throttled/job_spec.rb index fdb4d318..9b6772a0 100644 --- a/spec/lib/sidekiq/throttled/job_spec.rb +++ b/spec/lib/sidekiq/throttled/job_spec.rb @@ -1,7 +1,12 @@ # frozen_string_literal: true RSpec.describe Sidekiq::Throttled::Job do - let(:working_class) { Class.new { include Sidekiq::Throttled::Job } } + let(:working_class) do + Class.new do + include Sidekiq::Job + include Sidekiq::Throttled::Job + end + end it "aliased as Sidekiq::Throttled::Worker" do expect(Sidekiq::Throttled::Worker).to be described_class @@ -13,6 +18,89 @@ .to receive(:add).with(working_class, foo: :bar) working_class.sidekiq_throttle(foo: :bar) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :enqueue }) + end + + it "accepts and stores a requeue parameter including :with" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { with: :schedule }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :schedule }) + end + + it "accepts and stores a requeue parameter including :to" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :enqueue }) + end + + it "accepts and stores a requeue parameter including both :to and :with" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue, with: :schedule }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :schedule }) + end + + it "raises an error when :with is not a valid value" do + expect do + working_class.sidekiq_throttle(foo: :bar, requeue: { with: :invalid_with_value }) + end.to raise_error(ArgumentError, "requeue: invalid_with_value is not a valid value for :with") + end + + context "when default_requeue_options are set" do + before do + Sidekiq::Throttled.configure do |config| + config.default_requeue_options = { with: :schedule } + end + end + + after do + Sidekiq::Throttled.configure(&:reset!) + end + + it "uses the default when not overridden" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :schedule }) + end + + it "uses the default alongside a requeue parameter including :to" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :schedule }) + end + + it "allows overriding the default" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { with: :enqueue }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ with: :enqueue }) + end + + it "allows overriding the default and including a :to parameter" do + expect(Sidekiq::Throttled::Registry) + .to receive(:add).with(working_class, foo: :bar) + + working_class.sidekiq_throttle(foo: :bar, requeue: { to: :other_queue, with: :enqueue }) + + expect(working_class.sidekiq_throttled_requeue_options).to eq({ to: :other_queue, with: :enqueue }) + end end end diff --git a/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb b/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb index d5fa1ebc..de70974b 100644 --- a/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb +++ b/spec/lib/sidekiq/throttled/patches/basic_fetch_spec.rb @@ -49,7 +49,7 @@ fetch.retrieve_work expect { fetch.retrieve_work } - .to change { enqueued_jobs("default") }.to([["TestJob", 2], ["TestJob", 3]]) + .to change { enqueued_jobs("default") }.to([["TestJob", [2]], ["TestJob", [3]]]) .and(keep_unchanged { enqueued_jobs("critical") }) end @@ -65,7 +65,7 @@ it "updates cooldown queues" do expect { fetch.retrieve_work } - .to change { enqueued_jobs("default") }.to([["TestJob", 2], ["TestJob", 3]]) + .to change { enqueued_jobs("default") }.to([["TestJob", [2]], ["TestJob", [3]]]) .and(change { Sidekiq::Throttled.cooldown.queues }.to(["queue:default"])) end diff --git a/spec/lib/sidekiq/throttled/patches/super_fetch_spec.rb b/spec/lib/sidekiq/throttled/patches/super_fetch_spec.rb index 784add02..4bc7219e 100644 --- a/spec/lib/sidekiq/throttled/patches/super_fetch_spec.rb +++ b/spec/lib/sidekiq/throttled/patches/super_fetch_spec.rb @@ -53,7 +53,7 @@ fetch.retrieve_work expect { fetch.retrieve_work } - .to change { enqueued_jobs(base_queue) }.to([["TestJob", 2], ["TestJob", 3]]) + .to change { enqueued_jobs(base_queue) }.to([["TestJob", [2]], ["TestJob", [3]]]) .and(keep_unchanged { enqueued_jobs(critical_queue) }) end @@ -69,7 +69,7 @@ it "updates cooldown queues" do expect { fetch.retrieve_work } - .to change { enqueued_jobs(base_queue) }.to([["TestJob", 2], ["TestJob", 3]]) + .to change { enqueued_jobs(base_queue) }.to([["TestJob", [2]], ["TestJob", [3]]]) .and(change { Sidekiq::Throttled.cooldown.queues }.to(["queue:#{base_queue}"])) end diff --git a/spec/lib/sidekiq/throttled/strategy/concurrency_spec.rb b/spec/lib/sidekiq/throttled/strategy/concurrency_spec.rb index 5995e1c0..c0968bc0 100644 --- a/spec/lib/sidekiq/throttled/strategy/concurrency_spec.rb +++ b/spec/lib/sidekiq/throttled/strategy/concurrency_spec.rb @@ -40,6 +40,62 @@ end end + describe "#retry_in" do + context "when limit is exceeded with all jobs starting just now" do + before { 5.times { strategy.throttled? jid } } + + it "tells us to wait roughly one ttl" do + expect(subject.retry_in(jid)).to be_within(0.1).of(900) + end + end + + context "when limit exceeded, with first job starting 800 seconds ago" do + before do + Timecop.travel(Time.now - 800) do + strategy.throttled? jid + end + 4.times { strategy.throttled? jid } + end + + it "tells us to wait 100 seconds" do + expect(subject.retry_in(jid)).to be_within(0.1).of(100) + end + end + + context "when limit not exceeded, because the oldest job was more than the ttl ago" do + before do + Timecop.travel(Time.now - 1000) do + strategy.throttled? jid + end + 4.times { strategy.throttled? jid } + end + + it "tells us we do not need to wait" do + expect(subject.retry_in(jid)).to eq 0 + end + end + + context "when limit not exceeded, because there are fewer jobs than the limit" do + before do + 4.times { strategy.throttled? jid } + end + + it "tells us we do not need to wait" do + expect(subject.retry_in(jid)).to eq 0 + end + end + + context "when dynamic limit returns nil" do + let(:strategy) { described_class.new :test, limit: proc { |*| } } + + before { 5.times { strategy.throttled? jid } } + + it "tells us we do not need to wait" do + expect(subject.retry_in(jid)).to eq 0 + end + end + end + describe "#count" do subject { strategy.count } diff --git a/spec/lib/sidekiq/throttled/strategy/threshold_spec.rb b/spec/lib/sidekiq/throttled/strategy/threshold_spec.rb index 3ab6d126..ab692be5 100644 --- a/spec/lib/sidekiq/throttled/strategy/threshold_spec.rb +++ b/spec/lib/sidekiq/throttled/strategy/threshold_spec.rb @@ -40,6 +40,62 @@ end end + describe "#retry_in" do + context "when limit exceeded with all jobs happening just now" do + before { 5.times { strategy.throttled? } } + + it "tells us to wait roughly one period" do + expect(subject.retry_in).to be_within(0.1).of(10) + end + end + + context "when limit exceeded, with first job happening 8 seconds ago" do + before do + Timecop.travel(Time.now - 8) do + strategy.throttled? + end + 4.times { strategy.throttled? } + end + + it "tells us to wait 2 seconds" do + expect(subject.retry_in).to be_within(0.1).of(2) + end + end + + context "when limit not exceeded, because the oldest job was more than a period ago" do + before do + Timecop.travel(Time.now - 12) do + strategy.throttled? + end + 4.times { strategy.throttled? } + end + + it "tells us we do not need to wait" do + expect(subject.retry_in).to eq 0 + end + end + + context "when limit not exceeded, because there are fewer jobs than the limit" do + before do + 4.times { strategy.throttled? } + end + + it "tells us we do not need to wait" do + expect(subject.retry_in).to eq 0 + end + end + + context "when there is no limit" do + subject(:strategy) { described_class.new :test, limit: -> {}, period: 10 } + + before { 5.times { strategy.throttled? } } + + it "tells us we do not need to wait" do + expect(subject.retry_in).to eq 0 + end + end + end + describe "#count" do subject { strategy.count } diff --git a/spec/lib/sidekiq/throttled/strategy_spec.rb b/spec/lib/sidekiq/throttled/strategy_spec.rb index 230150e0..4c17d497 100644 --- a/spec/lib/sidekiq/throttled/strategy_spec.rb +++ b/spec/lib/sidekiq/throttled/strategy_spec.rb @@ -7,6 +7,10 @@ let(:concurrency) { { concurrency: { limit: 7 } } } let(:ten_seconds_ago) { Time.now - 10 } + before do + stub_job_class("ThrottledTestJob") + end + describe ".new" do it "fails if neither :threshold nor :concurrency given" do expect { described_class.new(:foo) }.to raise_error ArgumentError @@ -199,6 +203,738 @@ end end + describe "#requeue_throttled" do + def scheduled_redis_item_and_score + Sidekiq.redis do |conn| + # Depending on whether we have redis-client (for Sidekiq 7) or redis-rb (for older Sidekiq), + # zscan takes different arguments + if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("7.0.0") + conn.zscan("schedule", 0).last.first + else + conn.zscan("schedule").first + end + end + end + + context "when using Sidekiq Pro's SuperFetch", :sidekiq_pro do + let(:sidekiq_config) do + config = Sidekiq::Config.new(queues: %w[default other_queue]) + config.super_fetch! + config + end + let(:fetcher) { sidekiq_config.default_capsule.fetcher } + + let(:work) { fetcher.retrieve_work } + + before do + pre_existing_job = fetcher.retrieve_work + raise "Found pre-existing job: #{pre_existing_job.inspect}" if pre_existing_job + + # Sidekiq is FIFO queue, with head on right side of the list, + # meaning jobs below will be stored in 3, 2, 1 order. + ThrottledTestJob.perform_bulk([[1], [2], [3]]) + work + end + + describe "with parameter with: :enqueue" do + let(:options) { threshold } + + it "puts the job back on the queue" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "puts the job back on a different queue when specified" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: :other_queue) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]]) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "accepts a Proc for :with argument" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: ->(_arg) { :enqueue }) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "accepts a Proc for :to argument" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: ->(_arg) { :other_queue }) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]]) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + describe "with an invalid :to parameter" do + let(:options) { threshold } + + it "raises an ArgumentError when :to is an invalid type" do + invalid_to_value = 12_345 # Integer is an invalid type for `to` + expect do + subject.requeue_throttled(work, with: :enqueue, to: invalid_to_value) + end.to raise_error(ArgumentError, "Invalid argument for `to`") + end + end + + context "when :to Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: :enqueue, to: faulty_proc) + end.to raise_error("Proc error") + end + end + end + + describe "with parameter with: :schedule" do + context "when threshold constraints given" do + let(:options) { threshold } + + before do + allow(subject.threshold).to receive(:retry_in).and_return(300.0) + end + + it "reschedules for when the threshold strategy says to, plus some jitter" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "reschedules for a different queue if specified" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule, to: :other_queue) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:other_queue") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "accepts a Proc for :with argument" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: ->(_arg) { :schedule }) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "accepts a Proc for :to argument" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule, to: ->(_arg) { :other_queue }) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:other_queue") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + end + + context "when concurrency constraints given" do + let(:options) { concurrency } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(300.0) + end + + it "reschedules for when the concurrency strategy says to, plus some jitter" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + end + + context "when threshold and concurrency constraints given" do + let(:options) { threshold.merge concurrency } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(300.0) + allow(subject.threshold).to receive(:retry_in).and_return(500.0) + end + + it "reschedules for the later of what the two say, plus some jitter" do + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(51.0).of(Time.now.to_f + 550.0) + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + end + + describe "with an invalid :to parameter" do + let(:options) { threshold } + + it "raises an ArgumentError when :to is an invalid type" do + invalid_to_value = 12_345 # Integer is an invalid type for `to` + expect do + subject.requeue_throttled(work, with: :schedule, to: invalid_to_value) + end.to raise_error(ArgumentError, "Invalid argument for `to`") + end + end + + context "when :to Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: :schedule, to: faulty_proc) + end.to raise_error("Proc error") + end + end + end + + describe "with an invalid :with parameter" do + let(:options) { threshold } + + it "raises an error when :with is not a valid value" do + expect { subject.requeue_throttled(work, with: :invalid_with_value) } + .to raise_error(RuntimeError, "unrecognized :with option invalid_with_value") + end + end + + context "when :with is a Proc returning an invalid value" do + let(:options) { threshold } + + it "raises an error when Proc returns an unrecognized value" do + with_proc = ->(*_) { :invalid_value } + expect do + subject.requeue_throttled(work, with: with_proc) + end.to raise_error(RuntimeError, "unrecognized :with option #{with_proc}") + end + end + + context "when :with Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: faulty_proc) + end.to raise_error("Proc error") + end + end + + context "when :to resolves to nil or empty string" do + let(:options) { threshold } + + it "defaults to work.queue when :to returns nil" do + to_proc = ->(*_) {} + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: to_proc) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + + it "defaults to work.queue when :to returns an empty string" do + to_proc = ->(*_) { "" } + # Ensure that the job was removed from default queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + # And added to the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to eq([["ThrottledTestJob", [1]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: to_proc) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Ensure that the job is no longer in the private queue + expect(enqueued_jobs(fetcher.private_queue("default"))).to be_empty + end + end + + describe "#reschedule_throttled" do + let(:options) { threshold } + + context "when job_class is missing from work.job" do + before do + invalid_job_data = JSON.parse(work.job).tap do |msg| + msg.delete("class") + msg.delete("wrapped") + end + allow(work).to receive(:job).and_return(invalid_job_data.to_json) + end + + it "returns false and does not reschedule the job" do + expect(Sidekiq::Client).not_to receive(:enqueue_to_in) + expect(work).not_to receive(:acknowledge) + expect(subject.send(:reschedule_throttled, work, requeue_to: "queue:default")).to be_falsey + end + end + end + + describe "#retry_in" do + context "when both strategies return nil" do + let(:options) { concurrency.merge(threshold) } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(nil) + allow(subject.threshold).to receive(:retry_in).and_return(nil) + end + + it "raises an error indicating it cannot compute a valid retry interval" do + expect do + subject.send(:retry_in, work) + end.to raise_error("Cannot compute a valid retry interval") + end + end + end + end + + context "when using Sidekiq BasicFetch" do + let(:sidekiq_config) do + Sidekiq::Config.new(queues: %w[default]) + end + let(:fetcher) { sidekiq_config.default_capsule.fetcher } + + let(:work) { fetcher.retrieve_work } + + before do + pre_existing_job = fetcher.retrieve_work + raise "Found pre-existing job: #{pre_existing_job.inspect}" if pre_existing_job + + # Sidekiq is FIFO queue, with head on right side of the list, + # meaning jobs below will be stored in 3, 2, 1 order. + ThrottledTestJob.perform_bulk([[1], [2], [3]]) + work + end + + describe "with parameter with: :enqueue" do + let(:options) { threshold } + + it "puts the job back on the queue" do + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + end + + it "puts the job back on a different queue when specified" do + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: :other_queue) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]]) + end + + it "accepts a Proc for :with argument" do + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: ->(_arg) { :enqueue }) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + end + + it "accepts a Proc for :to argument" do + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: ->(_arg) { :other_queue }) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to eq([["ThrottledTestJob", [1]]]) + end + + describe "with an invalid :to parameter" do + let(:options) { threshold } + + it "raises an ArgumentError when :to is an invalid type" do + invalid_to_value = 12_345 # Integer is an invalid type for `to` + expect do + subject.requeue_throttled(work, with: :enqueue, to: invalid_to_value) + end.to raise_error(ArgumentError, "Invalid argument for `to`") + end + end + + context "when :to Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: :enqueue, to: faulty_proc) + end.to raise_error("Proc error") + end + end + end + + describe "with parameter with: :schedule" do + context "when threshold constraints given" do + let(:options) { threshold } + + before do + allow(subject.threshold).to receive(:retry_in).and_return(300.0) + end + + it "reschedules for when the threshold strategy says to, plus some jitter" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + + it "reschedules for a different queue if specified" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule, to: :other_queue) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:other_queue") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + + it "accepts a Proc for :with argument" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: ->(_arg) { :schedule }) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + + it "accepts a Proc for :to argument" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule, to: ->(_arg) { :other_queue }) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:other_queue") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + end + + context "when concurrency constraints given" do + let(:options) { concurrency } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(300.0) + end + + it "reschedules for when the concurrency strategy says to, plus some jitter" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(31.0).of(Time.now.to_f + 330.0) + end + end + + context "when threshold and concurrency constraints given" do + let(:options) { threshold.merge concurrency } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(300.0) + allow(subject.threshold).to receive(:retry_in).and_return(500.0) + end + + it "reschedules for the later of what the two say, plus some jitter" do + # Requeue the work, see that it ends up in 'schedule' + expect do + subject.requeue_throttled(work, with: :schedule) + end.to change { Sidekiq.redis { |conn| conn.zcard("schedule") } }.by(1) + + item, score = scheduled_redis_item_and_score + expect(JSON.parse(item)).to include("class" => "ThrottledTestJob", "args" => [1], + "queue" => "queue:default") + expect(score.to_f).to be_within(51.0).of(Time.now.to_f + 550.0) + end + end + + describe "with an invalid :to parameter" do + let(:options) { threshold } + + it "raises an ArgumentError when :to is an invalid type" do + invalid_to_value = 12_345 # Integer is an invalid type for `to` + expect do + subject.requeue_throttled(work, with: :schedule, to: invalid_to_value) + end.to raise_error(ArgumentError, "Invalid argument for `to`") + end + end + + context "when :to Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: :schedule, to: faulty_proc) + end.to raise_error("Proc error") + end + end + end + + describe "with an invalid :with parameter" do + let(:options) { threshold } + + it "raises an error when :with is not a valid value" do + expect { subject.requeue_throttled(work, with: :invalid_with_value) } + .to raise_error(RuntimeError, "unrecognized :with option invalid_with_value") + end + end + + context "when :with is a Proc returning an invalid value" do + let(:options) { threshold } + + it "raises an error when Proc returns an unrecognized value" do + with_proc = ->(*_) { :invalid_value } + expect do + subject.requeue_throttled(work, with: with_proc) + end.to raise_error(RuntimeError, "unrecognized :with option #{with_proc}") + end + end + + context "when :with Proc raises an exception" do + let(:options) { threshold } + + it "propagates the exception" do + faulty_proc = ->(*) { raise "Proc error" } + expect do + subject.requeue_throttled(work, with: faulty_proc) + end.to raise_error("Proc error") + end + end + + context "when :to resolves to nil or empty string" do + let(:options) { threshold } + + it "defaults to work.queue when :to returns nil" do + to_proc = ->(*_) {} + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: to_proc) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + end + + it "defaults to work.queue when :to returns an empty string" do + to_proc = ->(*_) { "" } + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [3]], ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + + # Requeue the work + subject.requeue_throttled(work, with: :enqueue, to: to_proc) + + # See that it is now on the end of the queue + expect(enqueued_jobs("default")).to eq([["ThrottledTestJob", [1]], ["ThrottledTestJob", [3]], + ["ThrottledTestJob", [2]]]) + expect(enqueued_jobs("other_queue")).to be_empty + end + end + + describe "#reschedule_throttled" do + let(:options) { threshold } + + context "when job_class is missing from work.job" do + before do + invalid_job_data = JSON.parse(work.job).tap do |msg| + msg.delete("class") + msg.delete("wrapped") + end + allow(work).to receive(:job).and_return(invalid_job_data.to_json) + end + + it "returns false and does not reschedule the job" do + expect(Sidekiq::Client).not_to receive(:enqueue_to_in) + expect(work).not_to receive(:acknowledge) + expect(subject.send(:reschedule_throttled, work, requeue_to: "queue:default")).to be_falsey + end + end + end + + describe "#retry_in" do + context "when both strategies return nil" do + let(:options) { concurrency.merge(threshold) } + + before do + allow(subject.concurrency).to receive(:retry_in).and_return(nil) + allow(subject.threshold).to receive(:retry_in).and_return(nil) + end + + it "raises an error indicating it cannot compute a valid retry interval" do + expect do + subject.send(:retry_in, work) + end.to raise_error("Cannot compute a valid retry interval") + end + end + end + end + end + describe "#reset!" do context "when only concurrency constraint given" do let(:options) { concurrency } diff --git a/spec/lib/sidekiq/throttled_spec.rb b/spec/lib/sidekiq/throttled_spec.rb index b8f6ad9a..8c8ae48f 100644 --- a/spec/lib/sidekiq/throttled_spec.rb +++ b/spec/lib/sidekiq/throttled_spec.rb @@ -2,6 +2,13 @@ require "json" +class ThrottledTestJob + include Sidekiq::Job + include Sidekiq::Throttled::Job + + def perform(*); end +end + RSpec.describe Sidekiq::Throttled do it "registers server middleware" do require "sidekiq/processor" @@ -113,4 +120,30 @@ described_class.throttled? message end end + + describe ".requeue_throttled" do + let(:sidekiq_config) do + if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("7.0.0") + Sidekiq::DEFAULTS + else + Sidekiq::Config.new(queues: %w[default]).default_capsule + end + end + + let!(:strategy) { Sidekiq::Throttled::Registry.add("ThrottledTestJob", threshold: { limit: 1, period: 1 }) } + + before do + ThrottledTestJob.sidekiq_throttled_requeue_options = { to: :other_queue, with: :enqueue } + end + + it "invokes requeue_throttled on the strategy" do + payload_jid = jid + job = { class: "ThrottledTestJob", jid: payload_jid.inspect }.to_json + work = Sidekiq::BasicFetch::UnitOfWork.new("queue:default", job, sidekiq_config) + + expect(strategy).to receive(:requeue_throttled).with(work, to: :other_queue, with: :enqueue) + + described_class.requeue_throttled work + end + end end diff --git a/spec/support/sidekiq.rb b/spec/support/sidekiq.rb index e4a19ecd..c8f69b47 100644 --- a/spec/support/sidekiq.rb +++ b/spec/support/sidekiq.rb @@ -58,10 +58,11 @@ def perform(*); end end def enqueued_jobs(queue) + q = queue.start_with?("queue:") ? queue : "queue:#{queue}" Sidekiq.redis do |conn| - conn.lrange("queue:#{queue}", 0, -1).map do |job| + conn.lrange(q, 0, -1).map do |job| JSON.parse(job).then do |payload| - [payload["class"], *payload["args"]] + [payload["class"], payload["args"]] end end end