Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for jitter option #4

Merged
merged 2 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,28 @@ class MyJob
include Sidekiq::Job
include Sidekiq::Rescue::Dsl

sidekiq_rescue ExpectedError, delay: 60, limit: 5
sidekiq_rescue ExpectedError, delay: 60, limit: 5, jitter: 0.15

def perform(*)
# ...
end
end
```

The `delay` is not the exact time between retries, but a minimum delay. The actual delay calculates based on retries counter and `delay` value. The formula is `delay + retries * rand(10)` seconds. Randomization is used to avoid retry storms.
The `delay` is not the exact time between retries, but a minimum delay. The actual delay calculates based on jitter and `delay` value. The formula is `delay + delay * jitter * rand` seconds. Randomization is used to avoid retry storms. The `jitter` represents the upper bound of possible wait time (expressed as a percentage) and defaults to 0.15 (15%).

The default values are:
- `delay`: 60 seconds
- `limit`: 5 retries
- `jitter`: 0.15

Delay and limit can be configured globally:
Delay, limit and jitter can be configured globally:

```ruby
Sidekiq::Rescue.configure do |config|
config.delay = 65
config.limit = 10
config.jitter = 0.2
end
```

Expand Down
20 changes: 19 additions & 1 deletion lib/sidekiq/rescue/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ module Rescue
class Config
DEFAULTS = {
delay: 60,
limit: 10
limit: 10,
jitter: 0.15
}.freeze

def initialize
@delay = DEFAULTS[:delay]
@limit = DEFAULTS[:limit]
@jitter = DEFAULTS[:jitter]
@logger = Sidekiq.logger
end

Expand Down Expand Up @@ -45,6 +47,22 @@ def limit=(limit)
@limit = limit
end

# The jitter for the delay.
# @return [Integer, Float]
attr_reader :jitter

# @param jitter [Integer, Float] The jitter for the delay.
# @return [void]
# @raise [ArgumentError] if jitter is not an Integer or Float
def jitter=(jitter)
case jitter
when Integer, Float
@jitter = jitter
else
raise ArgumentError, "jitter must be Integer or Float"
end
end

# The logger instance.
# @return [Logger]
# @note The default logger is Sidekiq's logger.
Expand Down
20 changes: 15 additions & 5 deletions lib/sidekiq/rescue/dsl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ module ClassMethods
# @raise [ArgumentError] if error is not an array of StandardError
# @raise [ArgumentError] if delay is not an Integer or Float
# @raise [ArgumentError] if limit is not an Integer
# @raise [ArgumentError] if jitter is not an Integer or Float
# @example
# sidekiq_rescue NetworkError, delay: 60, limit: 10
def sidekiq_rescue(*errors, delay: Sidekiq::Rescue.config.delay, limit: Sidekiq::Rescue.config.limit)
def sidekiq_rescue(*errors, delay: Sidekiq::Rescue.config.delay, limit: Sidekiq::Rescue.config.limit,
jitter: Sidekiq::Rescue.config.jitter)
unpacked_errors = validate_and_unpack_error_argument(errors)
validate_delay_argument(delay)
validate_limit_argument(limit)
assign_sidekiq_rescue_options(unpacked_errors, delay, limit)
validate_jitter_argument(jitter)
assign_sidekiq_rescue_options(errors: unpacked_errors, delay: delay, limit: limit, jitter: jitter)
end

# Find the error group and options for the given exception.
Expand All @@ -51,7 +54,6 @@ def validate_and_unpack_error_argument(error)
end

def validate_delay_argument(delay)
return if delay.nil?
return if delay.is_a?(Integer) || delay.is_a?(Float)

if delay.is_a?(Proc)
Expand All @@ -68,9 +70,17 @@ def validate_limit_argument(limit)
raise ArgumentError, "limit must be integer" if limit && !limit.is_a?(Integer)
end

def assign_sidekiq_rescue_options(errors, delay, limit)
def validate_jitter_argument(jitter)
return if jitter.is_a?(Integer) || jitter.is_a?(Float)

raise ArgumentError,
"jitter must be integer or float"
end

def assign_sidekiq_rescue_options(errors:, delay:, limit:, jitter:)
self.sidekiq_rescue_options ||= {}
self.sidekiq_rescue_options = self.sidekiq_rescue_options.merge(errors => { delay: delay, limit: limit })
self.sidekiq_rescue_options = self.sidekiq_rescue_options.merge(errors => { delay: delay, limit: limit,
jitter: jitter })
end
end
end
Expand Down
34 changes: 19 additions & 15 deletions lib/sidekiq/rescue/server_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ def sidekiq_rescue(job_payload, job_class)
end

def rescue_error(error, error_group, options, job_payload)
delay, limit = options.fetch_values(:delay, :limit)
delay, limit, jitter = options.fetch_values(:delay, :limit, :jitter)
rescue_counter = increment_rescue_counter_for(error_group, job_payload)
raise error if rescue_counter > limit

reschedule_at = calculate_reschedule_time(delay, rescue_counter)
log_reschedule_info(rescue_counter, error, reschedule_at)
reschedule_job(job_payload: job_payload, reschedule_at: reschedule_at, rescue_counter: rescue_counter,
calculated_delay = calculate_delay(delay, rescue_counter, jitter)
log_reschedule_info(rescue_counter, error, calculated_delay)
reschedule_job(job_payload: job_payload, delay: calculated_delay, rescue_counter: rescue_counter,
error_group: error_group)
end

Expand All @@ -46,23 +46,27 @@ def increment_rescue_counter_for(error_group, job_payload)
rescue_counter
end

def calculate_reschedule_time(delay, rescue_counter)
# NOTE: we use the retry counter to increase the jitter
# so that the jobs don't retry at the same time
# inspired by sidekiq https://github.com/sidekiq/sidekiq/blob/73c150d0430a8394cadb5cd49218895b113613a0/lib/sidekiq/job_retry.rb#L188
jitter = rand(10) * rescue_counter
def calculate_delay(delay, rescue_counter, jitter)
delay = delay.call(rescue_counter) if delay.is_a?(Proc)
Time.now.to_f + delay + jitter
jitter_delay = calculate_delay_jitter(jitter, delay)
delay + jitter_delay
end

def log_reschedule_info(rescue_counter, error, reschedule_at)
def calculate_delay_jitter(jitter, delay)
return 0.0 if jitter.zero?

jitter * Kernel.rand * delay
end

def log_reschedule_info(rescue_counter, error, delay)
Sidekiq::Rescue.logger.info("[sidekiq_rescue] Job failed #{rescue_counter} times with error: " \
"#{error.message}; rescheduling at #{reschedule_at}")
"#{error.message}; rescheduling in #{delay} seconds")
end

def reschedule_job(job_payload:, reschedule_at:, rescue_counter:, error_group:)
payload = job_payload.merge("at" => reschedule_at,
"sidekiq_rescue_exceptions_counter" => { error_group.to_s => rescue_counter })
def reschedule_job(job_payload:, delay:, rescue_counter:, error_group:)
payload = job_payload.dup
payload["at"] = Time.now.to_f + delay if delay.positive?
payload["sidekiq_rescue_exceptions_counter"] = { error_group.to_s => rescue_counter }
Sidekiq::Client.push(payload)
end
end
Expand Down
13 changes: 13 additions & 0 deletions spec/sidekiq/rescue/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@
end
end

describe "#jitter=" do
it "sets the jitter value" do
config = described_class.new
config.jitter = 0.1
expect(config.jitter).to eq(0.1)
end

it "raises an ArgumentError if jitter is not an Integer or Float" do
config = described_class.new
expect { config.jitter = "invalid" }.to raise_error(ArgumentError)
end
end

describe "#logger=" do
it "sets the logger value" do
config = described_class.new
Expand Down
23 changes: 16 additions & 7 deletions spec/sidekiq/rescue/dsl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def define_dsl(...)
it "sets error and default options" do
define_dsl { sidekiq_rescue TestError }

expect(job_class.sidekiq_rescue_options).to eq({ [TestError] => { delay: 60, limit: 10 } })
expect(job_class.sidekiq_rescue_options).to eq({ [TestError] => { delay: 60, limit: 10, jitter: 0.15 } })
end

it "sets the error classes" do
Expand Down Expand Up @@ -108,6 +108,12 @@ def define_dsl(...)
)
end

it "sets jitter" do
define_dsl { sidekiq_rescue TestError, jitter: 0.1 }

expect(job_class.sidekiq_rescue_options.dig([TestError], :jitter)).to eq(0.1)
end

it "inherits the parent's options in right order" do
parent = Class.new do
include Sidekiq::Job
Expand All @@ -120,9 +126,9 @@ def define_dsl(...)
sidekiq_rescue ChildError, delay: 20
end

expect(parent.sidekiq_rescue_options).to eq({ [ParentError] => { delay: 10, limit: 10 } })
expect(child.sidekiq_rescue_options.to_a).to eq([[[ParentError], { delay: 10, limit: 10 }],
[[ChildError], { delay: 20, limit: 10 }]])
expect(parent.sidekiq_rescue_options).to eq({ [ParentError] => { delay: 10, limit: 10, jitter: 0.15 } })
expect(child.sidekiq_rescue_options.to_a).to eq([[[ParentError], { delay: 10, limit: 10, jitter: 0.15 }],
[[ChildError], { delay: 20, limit: 10, jitter: 0.15 }]])
end
end

Expand All @@ -139,9 +145,12 @@ def define_dsl(...)
sidekiq_rescue ChildError, delay: 20
end

expect(parent.sidekiq_rescue_error_group_with_options_by(ParentError.new).last).to eq(delay: 10, limit: 10)
expect(child.sidekiq_rescue_error_group_with_options_by(ParentError.new).last).to eq(delay: 10, limit: 10)
expect(child.sidekiq_rescue_error_group_with_options_by(ChildError.new).last).to eq(delay: 20, limit: 10)
expect(parent.sidekiq_rescue_error_group_with_options_by(ParentError.new).last).to eq(delay: 10, limit: 10,
jitter: 0.15)
expect(child.sidekiq_rescue_error_group_with_options_by(ParentError.new).last).to eq(delay: 10, limit: 10,
jitter: 0.15)
expect(child.sidekiq_rescue_error_group_with_options_by(ChildError.new).last).to eq(delay: 20, limit: 10,
jitter: 0.15)
end
end
end
41 changes: 38 additions & 3 deletions spec/sidekiq/rescue/server_middleware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

call_with_expected_error
expect(Sidekiq::Client).to have_received(:push).with(
job_payload.merge("at" => be_within(10).of(Time.now.to_f + 60),
job_payload.merge("at" => be_within(0.15 * 60).of(Time.now.to_f + 60),
"sidekiq_rescue_exceptions_counter" => { "[TestError]" => 1 })
)
end
Expand Down Expand Up @@ -55,7 +55,7 @@

call_with_expected_error_and_delay_proc
expect(Sidekiq::Client).to have_received(:push).with(
job_payload.merge("at" => be_within(20).of(Time.now.to_f + (10 * 2)),
job_payload.merge("at" => be_within(0.15 * 20).of(Time.now.to_f + 20),
"sidekiq_rescue_exceptions_counter" => { "[TestError]" => 2 })
)
end
Expand Down Expand Up @@ -92,9 +92,44 @@

call_with_multiple_errors
expect(Sidekiq::Client).to have_received(:push).with(
job_payload.merge("at" => be_within(10).of(Time.now.to_f + 60),
job_payload.merge("at" => be_within(0.15 * 60).of(Time.now.to_f + 60),
"sidekiq_rescue_exceptions_counter" => { "[TestError, ParentError, ChildError]" => 1 })
)
end
end

context "with alterated jitter" do
subject(:call_with_jitter) do
middleware.call(job_instance, job_payload, "default") { raise TestError }
end

let(:job_instance) { WithCustomJitterJob.new }

it "reschedules the job on expected error and increments counter" do
allow(Sidekiq::Client).to receive(:push)

call_with_jitter
expect(Sidekiq::Client).to have_received(:push).with(
job_payload.merge("at" => be_within(0.15 * 60).of(Time.now.to_f + 60),
"sidekiq_rescue_exceptions_counter" => { "[TestError]" => 1 })
)
end
end

context "with zero jitter and delay" do
subject(:call_with_zero_jitter) do
middleware.call(job_instance, job_payload, "default") { raise TestError }
end

let(:job_instance) { WithZeroJitterAndDelayJob.new }

it "reschedules the job without delay" do
allow(Sidekiq::Client).to receive(:push)

call_with_zero_jitter
expect(Sidekiq::Client).to have_received(:push).with(
job_payload.merge("sidekiq_rescue_exceptions_counter" => { "[TestError]" => 1 })
)
end
end
end
8 changes: 8 additions & 0 deletions spec/support/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,11 @@ def perform(error_class)
end

ChildJobWithExpectedError = Class.new(WithTestErrorJob)

class WithCustomJitterJob < BaseJob
sidekiq_rescue TestError, jitter: 0.1
end

class WithZeroJitterAndDelayJob < BaseJob
sidekiq_rescue TestError, delay: 0, jitter: 0
end