From 9a3be9edc92b2f4e01d273b1adfbdf4649a9b080 Mon Sep 17 00:00:00 2001 From: Joshua Flanagan Date: Mon, 26 Oct 2015 18:16:45 -0500 Subject: [PATCH 1/9] Throttle calls to a custom configuration loader Many custom configuration loaders will retrieve the configuration from an external resource. `Resque::Pool` asks the loader for the latest configuration roughly once per second. You may want to reduce load on your external resource by caching the value, and only really fetching the latest configuration after a specific amount of time has passed. Instead of forcing each configuration loader author to re-write (and test) this logic, we provide `Resque::Pool::ConfigThrottle`. See the spec for full details of its behavior. --- README.md | 18 +++++ lib/resque/pool/config_throttle.rb | 35 +++++++++ spec/config_throttle_spec.rb | 121 +++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+) create mode 100644 lib/resque/pool/config_throttle.rb create mode 100644 spec/config_throttle_spec.rb diff --git a/README.md b/README.md index cd29078..a1d14ad 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,24 @@ returns. It can optionally implement a `#reset!` method, which will be invoked when the HUP signal is received, allowing the loader to flush its cache, or perform any other re-initialization. +You can reduce the frequency that your configuration loader is called by +wrapping it with `Resque::Pool::ConfigThrottle` and specifying a time (in seconds) +to cache the previous value (see [the spec](spec/config_throttle_spec.rb) for +more details): + +```ruby +task "resque:pool:setup" do + redis_loader = lambda do |env| + worker_count = Redis.current.get("pool_workers_#{env}").to_i + {"queueA,queueB" => worker_count } + end + + # calls through to redis_loader at most once per 10 seconds + Resque::Pool.config_loader = Resque::Pool::ConfigThrottle(10, redis_loader) +end +``` + + Zero-downtime code deploys -------------------------- diff --git a/lib/resque/pool/config_throttle.rb b/lib/resque/pool/config_throttle.rb new file mode 100644 index 0000000..77c958a --- /dev/null +++ b/lib/resque/pool/config_throttle.rb @@ -0,0 +1,35 @@ +module Resque + class Pool + # Throttle the frequency of loading pool configuration + class ConfigThrottle + def initialize(period, config_loader, time_source: Time) + @period = period + @config_loader = config_loader + @resettable = config_loader.respond_to?(:reset!) + @last_check = 0 + @time_source = time_source + end + + def call(env) + # We do not need to cache per `env`, since the value of `env` will not + # change during the life of the process. + if (now > @last_check + @period) + @cache = @config_loader.call(env) + @last_check = now + end + @cache + end + + def reset! + @last_check = 0 + if @resettable + @config_loader.reset! + end + end + + def now + @time_source.now.to_f + end + end + end +end diff --git a/spec/config_throttle_spec.rb b/spec/config_throttle_spec.rb new file mode 100644 index 0000000..374c63d --- /dev/null +++ b/spec/config_throttle_spec.rb @@ -0,0 +1,121 @@ +require 'spec_helper' +require 'resque/pool/config_throttle' + +describe Resque::Pool::ConfigThrottle do + let(:fake_time) { FakeTime.new 1445898807 } + + it "returns the config returned by the wrapped config loader for given env" do + wrapped_config = { + "dev" => {"qA,qB" => 1}, + "prd" => {"qA,qB" => 4} + } + wrapped_loader = lambda {|env| wrapped_config[env] } + throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader) + + throttle.call("prd").should eq({"qA,qB" => 4}) + end + + it "does not call wrapped loader again until the specified period of time has elapsed" do + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = {"qA,qB" => 1} + + throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader, time_source: fake_time) + first_call = throttle.call("prd") + + new_config = {"qA,qB" => 22} + wrapped_loader.configuration = new_config + fake_time.advance_time 6 + # config changed, but not enough time elapsed + + second_call = throttle.call("prd") + + second_call.should eq(first_call) + wrapped_loader.times_called.should == 1 + + fake_time.advance_time 6 + # now, enough time has elapsed to retrieve latest config + + third_call = throttle.call("prd") + + third_call.should_not eq(first_call) + third_call.should eq(new_config) + wrapped_loader.times_called.should == 2 + + # further calls continue to use cached value + throttle.call("prd") + throttle.call("prd") + throttle.call("prd") + wrapped_loader.times_called.should == 2 + end + + it "forces a call to the wrapperd loader after reset! called, even if required time hasn't elapsed" do + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = {"qA,qB" => 1} + + throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader, time_source: fake_time) + first_call = throttle.call("prd") + + new_config = {"qA,qB" => 22} + wrapped_loader.configuration = new_config + fake_time.advance_time 6 + # the 10 second period has *not* elapsed + + throttle.reset! + + second_call = throttle.call("prd") + + second_call.should eq(new_config) + wrapped_loader.times_called.should == 2 + end + + it "delegates reset! to the wrapped_loader, when supported" do + wrapped_loader = TestConfigLoader.new + throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader) + + wrapped_loader.times_reset.should == 0 + throttle.reset! + wrapped_loader.times_reset.should == 1 + end + + it "does not delegate reset! to the wrapped_loader when it is not supported" do + wrapped_loader = lambda {|env| Hash.new } + throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader) + + expect { + throttle.reset! + }.to_not raise_error + end + + + class TestConfigLoader + attr_accessor :configuration + attr_reader :times_called + attr_reader :times_reset + + def initialize + @times_called = 0 + @times_reset = 0 + end + + def call(env) + @times_called += 1 + configuration + end + + def reset! + @times_reset += 1 + end + end + + class FakeTime + attr_reader :now + + def initialize(start_time) + @now = start_time + end + + def advance_time(seconds) + @now += seconds + end + end +end From a368a2ee0d946fe258aaa704b7b4a466c56e2d5b Mon Sep 17 00:00:00 2001 From: "nicholas a. evans" Date: Tue, 27 Oct 2015 20:52:19 -0400 Subject: [PATCH 2/9] Namespace all ConfigLoaders (we're going to add more) --- lib/resque/pool.rb | 4 +- .../pool/config_loaders/config_throttle.rb | 40 ++++++++++++ .../config_loaders/file_or_hash_loader.rb | 63 +++++++++++++++++++ lib/resque/pool/config_throttle.rb | 35 ----------- lib/resque/pool/file_or_hash_loader.rb | 59 ----------------- .../config_throttle_spec.rb | 24 ++++--- spec/resque_pool_spec.rb | 4 +- 7 files changed, 125 insertions(+), 104 deletions(-) create mode 100644 lib/resque/pool/config_loaders/config_throttle.rb create mode 100644 lib/resque/pool/config_loaders/file_or_hash_loader.rb delete mode 100644 lib/resque/pool/config_throttle.rb delete mode 100644 lib/resque/pool/file_or_hash_loader.rb rename spec/{ => config_loaders}/config_throttle_spec.rb (81%) diff --git a/lib/resque/pool.rb b/lib/resque/pool.rb index 5c99b47..9a76509 100644 --- a/lib/resque/pool.rb +++ b/lib/resque/pool.rb @@ -4,7 +4,7 @@ require 'resque/pool/version' require 'resque/pool/logging' require 'resque/pool/pooled_worker' -require 'resque/pool/file_or_hash_loader' +require 'resque/pool/config_loaders/file_or_hash_loader' require 'erb' require 'fcntl' require 'yaml' @@ -119,7 +119,7 @@ def self.create_configured def init_config(loader) case loader when String, Hash, nil - @config_loader = FileOrHashLoader.new(loader) + @config_loader = ConfigLoaders::FileOrHashLoader.new(loader) else @config_loader = loader end diff --git a/lib/resque/pool/config_loaders/config_throttle.rb b/lib/resque/pool/config_loaders/config_throttle.rb new file mode 100644 index 0000000..174ed01 --- /dev/null +++ b/lib/resque/pool/config_loaders/config_throttle.rb @@ -0,0 +1,40 @@ +module Resque + class Pool + + module ConfigLoaders + + # Throttle the frequency of loading pool configuration + class ConfigThrottle + def initialize(period, config_loader, time_source: Time) + @period = period + @config_loader = config_loader + @resettable = config_loader.respond_to?(:reset!) + @last_check = 0 + @time_source = time_source + end + + def call(env) + # We do not need to cache per `env`, since the value of `env` will not + # change during the life of the process. + if (now > @last_check + @period) + @cache = @config_loader.call(env) + @last_check = now + end + @cache + end + + def reset! + @last_check = 0 + if @resettable + @config_loader.reset! + end + end + + def now + @time_source.now.to_f + end + end + + end + end +end diff --git a/lib/resque/pool/config_loaders/file_or_hash_loader.rb b/lib/resque/pool/config_loaders/file_or_hash_loader.rb new file mode 100644 index 0000000..5d6b108 --- /dev/null +++ b/lib/resque/pool/config_loaders/file_or_hash_loader.rb @@ -0,0 +1,63 @@ +module Resque + class Pool + module ConfigLoaders + + class FileOrHashLoader + def initialize(filename_or_hash=nil) + case filename_or_hash + when String, nil + @filename = filename_or_hash + when Hash + @static_config = filename_or_hash.dup + else + raise "#{self.class} cannot be initialized with #{filename_or_hash.inspect}" + end + end + + def call(environment) + @config ||= load_config_from_file(environment) + end + + def reset! + @config = nil + end + + private + + def load_config_from_file(environment) + if @static_config + new_config = @static_config + else + filename = config_filename + new_config = load_config filename + end + apply_environment new_config, environment + end + + def apply_environment(config, environment) + environment and config[environment] and config.merge!(config[environment]) + config.delete_if {|key, value| value.is_a? Hash } + end + + def config_filename + @filename || choose_config_file + end + + def load_config(filename) + return {} unless filename + YAML.load(ERB.new(IO.read(filename)).result) + end + + CONFIG_FILES = ["resque-pool.yml", "config/resque-pool.yml"] + def choose_config_file + if ENV["RESQUE_POOL_CONFIG"] + ENV["RESQUE_POOL_CONFIG"] + else + CONFIG_FILES.detect { |f| File.exist?(f) } + end + end + end + + end + end +end diff --git a/lib/resque/pool/config_throttle.rb b/lib/resque/pool/config_throttle.rb deleted file mode 100644 index 77c958a..0000000 --- a/lib/resque/pool/config_throttle.rb +++ /dev/null @@ -1,35 +0,0 @@ -module Resque - class Pool - # Throttle the frequency of loading pool configuration - class ConfigThrottle - def initialize(period, config_loader, time_source: Time) - @period = period - @config_loader = config_loader - @resettable = config_loader.respond_to?(:reset!) - @last_check = 0 - @time_source = time_source - end - - def call(env) - # We do not need to cache per `env`, since the value of `env` will not - # change during the life of the process. - if (now > @last_check + @period) - @cache = @config_loader.call(env) - @last_check = now - end - @cache - end - - def reset! - @last_check = 0 - if @resettable - @config_loader.reset! - end - end - - def now - @time_source.now.to_f - end - end - end -end diff --git a/lib/resque/pool/file_or_hash_loader.rb b/lib/resque/pool/file_or_hash_loader.rb deleted file mode 100644 index f6c55b6..0000000 --- a/lib/resque/pool/file_or_hash_loader.rb +++ /dev/null @@ -1,59 +0,0 @@ -module Resque - class Pool - class FileOrHashLoader - def initialize(filename_or_hash=nil) - case filename_or_hash - when String, nil - @filename = filename_or_hash - when Hash - @static_config = filename_or_hash.dup - else - raise "#{self.class} cannot be initialized with #{filename_or_hash.inspect}" - end - end - - def call(environment) - @config ||= load_config_from_file(environment) - end - - def reset! - @config = nil - end - - private - - def load_config_from_file(environment) - if @static_config - new_config = @static_config - else - filename = config_filename - new_config = load_config filename - end - apply_environment new_config, environment - end - - def apply_environment(config, environment) - environment and config[environment] and config.merge!(config[environment]) - config.delete_if {|key, value| value.is_a? Hash } - end - - def config_filename - @filename || choose_config_file - end - - def load_config(filename) - return {} unless filename - YAML.load(ERB.new(IO.read(filename)).result) - end - - CONFIG_FILES = ["resque-pool.yml", "config/resque-pool.yml"] - def choose_config_file - if ENV["RESQUE_POOL_CONFIG"] - ENV["RESQUE_POOL_CONFIG"] - else - CONFIG_FILES.detect { |f| File.exist?(f) } - end - end - end - end -end diff --git a/spec/config_throttle_spec.rb b/spec/config_loaders/config_throttle_spec.rb similarity index 81% rename from spec/config_throttle_spec.rb rename to spec/config_loaders/config_throttle_spec.rb index 374c63d..073207a 100644 --- a/spec/config_throttle_spec.rb +++ b/spec/config_loaders/config_throttle_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' -require 'resque/pool/config_throttle' +require 'resque/pool/config_loaders/config_throttle' -describe Resque::Pool::ConfigThrottle do +describe Resque::Pool::ConfigLoaders::ConfigThrottle do let(:fake_time) { FakeTime.new 1445898807 } it "returns the config returned by the wrapped config loader for given env" do @@ -10,7 +10,9 @@ "prd" => {"qA,qB" => 4} } wrapped_loader = lambda {|env| wrapped_config[env] } - throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader) + throttle = Resque::Pool::ConfigLoaders::ConfigThrottle.new( + 10, wrapped_loader + ) throttle.call("prd").should eq({"qA,qB" => 4}) end @@ -19,7 +21,9 @@ wrapped_loader = TestConfigLoader.new wrapped_loader.configuration = {"qA,qB" => 1} - throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader, time_source: fake_time) + throttle = Resque::Pool::ConfigLoaders::ConfigThrottle.new( + 10, wrapped_loader, time_source: fake_time + ) first_call = throttle.call("prd") new_config = {"qA,qB" => 22} @@ -52,7 +56,9 @@ wrapped_loader = TestConfigLoader.new wrapped_loader.configuration = {"qA,qB" => 1} - throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader, time_source: fake_time) + throttle = Resque::Pool::ConfigLoaders::ConfigThrottle.new( + 10, wrapped_loader, time_source: fake_time + ) first_call = throttle.call("prd") new_config = {"qA,qB" => 22} @@ -70,7 +76,9 @@ it "delegates reset! to the wrapped_loader, when supported" do wrapped_loader = TestConfigLoader.new - throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader) + throttle = Resque::Pool::ConfigLoaders::ConfigThrottle.new( + 10, wrapped_loader + ) wrapped_loader.times_reset.should == 0 throttle.reset! @@ -79,7 +87,9 @@ it "does not delegate reset! to the wrapped_loader when it is not supported" do wrapped_loader = lambda {|env| Hash.new } - throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader) + throttle = Resque::Pool::ConfigLoaders::ConfigThrottle.new( + 10, wrapped_loader + ) expect { throttle.reset! diff --git a/spec/resque_pool_spec.rb b/spec/resque_pool_spec.rb index 405c345..1c1bd56 100644 --- a/spec/resque_pool_spec.rb +++ b/spec/resque_pool_spec.rb @@ -248,7 +248,9 @@ module Rails; end subject { Resque::Pool.create_configured } it "created pools use config file and hash loading logic" do - subject.config_loader.should be_instance_of Resque::Pool::FileOrHashLoader + subject.config_loader.should be_instance_of( + Resque::Pool::ConfigLoaders::FileOrHashLoader + ) end end From a1cba7759903f7e5e81b39728c214c537fc3cc13 Mon Sep 17 00:00:00 2001 From: "nicholas a. evans" Date: Tue, 27 Oct 2015 21:01:13 -0400 Subject: [PATCH 3/9] ConfigLoaders::Throttled (rename and minor tweaks) --- .../{config_throttle.rb => throttled.rb} | 12 +- spec/config_loaders/config_throttle_spec.rb | 131 ------------------ spec/config_loaders/throttled_spec.rb | 125 +++++++++++++++++ 3 files changed, 131 insertions(+), 137 deletions(-) rename lib/resque/pool/config_loaders/{config_throttle.rb => throttled.rb} (80%) delete mode 100644 spec/config_loaders/config_throttle_spec.rb create mode 100644 spec/config_loaders/throttled_spec.rb diff --git a/lib/resque/pool/config_loaders/config_throttle.rb b/lib/resque/pool/config_loaders/throttled.rb similarity index 80% rename from lib/resque/pool/config_loaders/config_throttle.rb rename to lib/resque/pool/config_loaders/throttled.rb index 174ed01..9dd255e 100644 --- a/lib/resque/pool/config_loaders/config_throttle.rb +++ b/lib/resque/pool/config_loaders/throttled.rb @@ -4,10 +4,10 @@ class Pool module ConfigLoaders # Throttle the frequency of loading pool configuration - class ConfigThrottle + class Throttled < SimpleDelegator def initialize(period, config_loader, time_source: Time) + super(config_loader) @period = period - @config_loader = config_loader @resettable = config_loader.respond_to?(:reset!) @last_check = 0 @time_source = time_source @@ -17,7 +17,7 @@ def call(env) # We do not need to cache per `env`, since the value of `env` will not # change during the life of the process. if (now > @last_check + @period) - @cache = @config_loader.call(env) + @cache = super @last_check = now end @cache @@ -25,11 +25,11 @@ def call(env) def reset! @last_check = 0 - if @resettable - @config_loader.reset! - end + super if @resettable end + private + def now @time_source.now.to_f end diff --git a/spec/config_loaders/config_throttle_spec.rb b/spec/config_loaders/config_throttle_spec.rb deleted file mode 100644 index 073207a..0000000 --- a/spec/config_loaders/config_throttle_spec.rb +++ /dev/null @@ -1,131 +0,0 @@ -require 'spec_helper' -require 'resque/pool/config_loaders/config_throttle' - -describe Resque::Pool::ConfigLoaders::ConfigThrottle do - let(:fake_time) { FakeTime.new 1445898807 } - - it "returns the config returned by the wrapped config loader for given env" do - wrapped_config = { - "dev" => {"qA,qB" => 1}, - "prd" => {"qA,qB" => 4} - } - wrapped_loader = lambda {|env| wrapped_config[env] } - throttle = Resque::Pool::ConfigLoaders::ConfigThrottle.new( - 10, wrapped_loader - ) - - throttle.call("prd").should eq({"qA,qB" => 4}) - end - - it "does not call wrapped loader again until the specified period of time has elapsed" do - wrapped_loader = TestConfigLoader.new - wrapped_loader.configuration = {"qA,qB" => 1} - - throttle = Resque::Pool::ConfigLoaders::ConfigThrottle.new( - 10, wrapped_loader, time_source: fake_time - ) - first_call = throttle.call("prd") - - new_config = {"qA,qB" => 22} - wrapped_loader.configuration = new_config - fake_time.advance_time 6 - # config changed, but not enough time elapsed - - second_call = throttle.call("prd") - - second_call.should eq(first_call) - wrapped_loader.times_called.should == 1 - - fake_time.advance_time 6 - # now, enough time has elapsed to retrieve latest config - - third_call = throttle.call("prd") - - third_call.should_not eq(first_call) - third_call.should eq(new_config) - wrapped_loader.times_called.should == 2 - - # further calls continue to use cached value - throttle.call("prd") - throttle.call("prd") - throttle.call("prd") - wrapped_loader.times_called.should == 2 - end - - it "forces a call to the wrapperd loader after reset! called, even if required time hasn't elapsed" do - wrapped_loader = TestConfigLoader.new - wrapped_loader.configuration = {"qA,qB" => 1} - - throttle = Resque::Pool::ConfigLoaders::ConfigThrottle.new( - 10, wrapped_loader, time_source: fake_time - ) - first_call = throttle.call("prd") - - new_config = {"qA,qB" => 22} - wrapped_loader.configuration = new_config - fake_time.advance_time 6 - # the 10 second period has *not* elapsed - - throttle.reset! - - second_call = throttle.call("prd") - - second_call.should eq(new_config) - wrapped_loader.times_called.should == 2 - end - - it "delegates reset! to the wrapped_loader, when supported" do - wrapped_loader = TestConfigLoader.new - throttle = Resque::Pool::ConfigLoaders::ConfigThrottle.new( - 10, wrapped_loader - ) - - wrapped_loader.times_reset.should == 0 - throttle.reset! - wrapped_loader.times_reset.should == 1 - end - - it "does not delegate reset! to the wrapped_loader when it is not supported" do - wrapped_loader = lambda {|env| Hash.new } - throttle = Resque::Pool::ConfigLoaders::ConfigThrottle.new( - 10, wrapped_loader - ) - - expect { - throttle.reset! - }.to_not raise_error - end - - - class TestConfigLoader - attr_accessor :configuration - attr_reader :times_called - attr_reader :times_reset - - def initialize - @times_called = 0 - @times_reset = 0 - end - - def call(env) - @times_called += 1 - configuration - end - - def reset! - @times_reset += 1 - end - end - - class FakeTime - attr_reader :now - - def initialize(start_time) - @now = start_time - end - - def advance_time(seconds) - @now += seconds - end - end -end diff --git a/spec/config_loaders/throttled_spec.rb b/spec/config_loaders/throttled_spec.rb new file mode 100644 index 0000000..a1960fd --- /dev/null +++ b/spec/config_loaders/throttled_spec.rb @@ -0,0 +1,125 @@ +require 'spec_helper' +require 'resque/pool/config_loaders/throttled' + +module Resque::Pool::ConfigLoaders + + describe Throttled do + let(:fake_time) { FakeTime.new 1445898807 } + + it "returns the config returned by the wrapped config loader for given env" do + wrapped_config = { + "dev" => {"qA,qB" => 1}, + "prd" => {"qA,qB" => 4} + } + wrapped_loader = lambda {|env| wrapped_config[env] } + throttle = Throttled.new(10, wrapped_loader) + + throttle.call("prd").should eq({"qA,qB" => 4}) + end + + it "does not call wrapped loader again until the specified period of time has elapsed" do + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = {"qA,qB" => 1} + + throttle = Throttled.new(10, wrapped_loader, time_source: fake_time) + first_call = throttle.call("prd") + + new_config = {"qA,qB" => 22} + wrapped_loader.configuration = new_config + fake_time.advance_time 6 + # config changed, but not enough time elapsed + + second_call = throttle.call("prd") + + second_call.should eq(first_call) + wrapped_loader.times_called.should == 1 + + fake_time.advance_time 6 + # now, enough time has elapsed to retrieve latest config + + third_call = throttle.call("prd") + + third_call.should_not eq(first_call) + third_call.should eq(new_config) + wrapped_loader.times_called.should == 2 + + # further calls continue to use cached value + throttle.call("prd") + throttle.call("prd") + throttle.call("prd") + wrapped_loader.times_called.should == 2 + end + + it "forces a call to the wrapperd loader after reset! called, even if required time hasn't elapsed" do + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = {"qA,qB" => 1} + + throttle = Throttled.new(10, wrapped_loader, time_source: fake_time) + first_call = throttle.call("prd") + + new_config = {"qA,qB" => 22} + wrapped_loader.configuration = new_config + fake_time.advance_time 6 + # the 10 second period has *not* elapsed + + throttle.reset! + + second_call = throttle.call("prd") + + second_call.should eq(new_config) + wrapped_loader.times_called.should == 2 + end + + it "delegates reset! to the wrapped_loader, when supported" do + wrapped_loader = TestConfigLoader.new + throttle = Throttled.new(10, wrapped_loader) + + wrapped_loader.times_reset.should == 0 + throttle.reset! + wrapped_loader.times_reset.should == 1 + end + + it "does not delegate reset! to the wrapped_loader when it is not supported" do + wrapped_loader = lambda {|env| Hash.new } + throttle = Throttled.new(10, wrapped_loader) + + expect { + throttle.reset! + }.to_not raise_error + end + + + class TestConfigLoader + attr_accessor :configuration + attr_reader :times_called + attr_reader :times_reset + + def initialize + @times_called = 0 + @times_reset = 0 + end + + def call(env) + @times_called += 1 + configuration + end + + def reset! + @times_reset += 1 + end + end + + class FakeTime + attr_reader :now + + def initialize(start_time) + @now = start_time + end + + def advance_time(seconds) + @now += seconds + end + end + end + +end From 4e28a9ca7974e2ecade706cebef3321a091b3799 Mon Sep 17 00:00:00 2001 From: "nicholas a. evans" Date: Wed, 28 Oct 2015 10:46:02 -0400 Subject: [PATCH 4/9] ConfigLoaders::Throttled: made period an optional keyword arg --- lib/resque/pool/config_loaders/throttled.rb | 4 ++- spec/config_loaders/throttled_spec.rb | 38 ++++++++++++++++----- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/lib/resque/pool/config_loaders/throttled.rb b/lib/resque/pool/config_loaders/throttled.rb index 9dd255e..4ab0edc 100644 --- a/lib/resque/pool/config_loaders/throttled.rb +++ b/lib/resque/pool/config_loaders/throttled.rb @@ -4,8 +4,10 @@ class Pool module ConfigLoaders # Throttle the frequency of loading pool configuration + # Defaults to call only once per 10 seconds. class Throttled < SimpleDelegator - def initialize(period, config_loader, time_source: Time) + + def initialize(config_loader, period: 10, time_source: Time) super(config_loader) @period = period @resettable = config_loader.respond_to?(:reset!) diff --git a/spec/config_loaders/throttled_spec.rb b/spec/config_loaders/throttled_spec.rb index a1960fd..2f2bf12 100644 --- a/spec/config_loaders/throttled_spec.rb +++ b/spec/config_loaders/throttled_spec.rb @@ -12,16 +12,16 @@ module Resque::Pool::ConfigLoaders "prd" => {"qA,qB" => 4} } wrapped_loader = lambda {|env| wrapped_config[env] } - throttle = Throttled.new(10, wrapped_loader) + throttle = Throttled.new(wrapped_loader) throttle.call("prd").should eq({"qA,qB" => 4}) end - it "does not call wrapped loader again until the specified period of time has elapsed" do + it "does not call wrapped loader again until the default period of time has elapsed" do wrapped_loader = TestConfigLoader.new wrapped_loader.configuration = {"qA,qB" => 1} - throttle = Throttled.new(10, wrapped_loader, time_source: fake_time) + throttle = Throttled.new(wrapped_loader, time_source: fake_time) first_call = throttle.call("prd") new_config = {"qA,qB" => 22} @@ -50,12 +50,34 @@ module Resque::Pool::ConfigLoaders wrapped_loader.times_called.should == 2 end + it "can specify an alternate cache period" do + config0 = {foo: 2, bar: 1} + config1 = {bar: 3, baz: 9} + config2 = {foo: 4, quux: 1} + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = config0 + throttle = Throttled.new( + wrapped_loader, period: 60, time_source: fake_time + ) + throttle.call("prd").should eq(config0) + wrapped_loader.configuration = config1 + fake_time.advance_time 59 + throttle.call("prd").should eq(config0) + fake_time.advance_time 5 + throttle.call("prd").should eq(config1) + wrapped_loader.configuration = config2 + fake_time.advance_time 59 + throttle.call("prd").should eq(config1) + fake_time.advance_time 2 + throttle.call("prd").should eq(config2) + end + it "forces a call to the wrapperd loader after reset! called, even if required time hasn't elapsed" do wrapped_loader = TestConfigLoader.new wrapped_loader.configuration = {"qA,qB" => 1} - throttle = Throttled.new(10, wrapped_loader, time_source: fake_time) - first_call = throttle.call("prd") + throttle = Throttled.new(wrapped_loader, time_source: fake_time) + throttle.call("prd") new_config = {"qA,qB" => 22} wrapped_loader.configuration = new_config @@ -72,7 +94,7 @@ module Resque::Pool::ConfigLoaders it "delegates reset! to the wrapped_loader, when supported" do wrapped_loader = TestConfigLoader.new - throttle = Throttled.new(10, wrapped_loader) + throttle = Throttled.new(wrapped_loader) wrapped_loader.times_reset.should == 0 throttle.reset! @@ -81,14 +103,13 @@ module Resque::Pool::ConfigLoaders it "does not delegate reset! to the wrapped_loader when it is not supported" do wrapped_loader = lambda {|env| Hash.new } - throttle = Throttled.new(10, wrapped_loader) + throttle = Throttled.new(wrapped_loader) expect { throttle.reset! }.to_not raise_error end - class TestConfigLoader attr_accessor :configuration attr_reader :times_called @@ -120,6 +141,7 @@ def advance_time(seconds) @now += seconds end end + end end From 50eb527442e82f38806e21ed77dd27dd3daee764 Mon Sep 17 00:00:00 2001 From: "nicholas a. evans" Date: Wed, 28 Oct 2015 12:28:59 -0400 Subject: [PATCH 5/9] ruby 2.0 SimpleDelegator needs 'delegate' loaded --- lib/resque/pool/config_loaders/throttled.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/resque/pool/config_loaders/throttled.rb b/lib/resque/pool/config_loaders/throttled.rb index 4ab0edc..4e535e4 100644 --- a/lib/resque/pool/config_loaders/throttled.rb +++ b/lib/resque/pool/config_loaders/throttled.rb @@ -1,3 +1,5 @@ +require "delegate" + module Resque class Pool From 9f362fdd3465436fcad92d7139b14f220118c9c6 Mon Sep 17 00:00:00 2001 From: "nicholas a. evans" Date: Wed, 28 Oct 2015 10:27:23 -0400 Subject: [PATCH 6/9] Switch from sprinkled requires to autoload n.b. this cleans up the resque/pool/pooled_worker monkeypatching a little bit. It runs only on `Resque::Pool.new` now. --- bin/resque-pool | 2 +- lib/resque/pool.rb | 16 +++++++++------- lib/resque/pool/cli.rb | 3 +-- lib/resque/pool/config_loaders.rb | 12 ++++++++++++ .../pool/config_loaders/file_or_hash_loader.rb | 3 +++ lib/resque/pool/pooled_worker.rb | 12 ++++++++---- 6 files changed, 34 insertions(+), 14 deletions(-) create mode 100644 lib/resque/pool/config_loaders.rb diff --git a/bin/resque-pool b/bin/resque-pool index c3876c0..e6dfbb8 100755 --- a/bin/resque-pool +++ b/bin/resque-pool @@ -3,5 +3,5 @@ $LOAD_PATH.unshift File.expand_path('../../lib', __FILE__) -require 'resque/pool/cli' +require 'resque/pool' Resque::Pool::CLI.run diff --git a/lib/resque/pool.rb b/lib/resque/pool.rb index 9a76509..11c2ec6 100644 --- a/lib/resque/pool.rb +++ b/lib/resque/pool.rb @@ -1,16 +1,18 @@ # -*- encoding: utf-8 -*- require 'resque' require 'resque/worker' -require 'resque/pool/version' -require 'resque/pool/logging' -require 'resque/pool/pooled_worker' -require 'resque/pool/config_loaders/file_or_hash_loader' -require 'erb' require 'fcntl' -require 'yaml' +require 'socket' module Resque class Pool + autoload :CLI, "resque/pool/cli" + autoload :ConfigLoaders, "resque/pool/config_loaders" + autoload :Killer, "resque/pool/killer" + autoload :Logging, "resque/pool/logging" + autoload :PooledWorker, "resque/pool/pooled_worker" + autoload :VERSION, "resque/pool/version" + SIG_QUEUE_MAX_SIZE = 5 DEFAULT_WORKER_INTERVAL = 5 QUEUE_SIGS = [ :QUIT, :INT, :TERM, :USR1, :USR2, :CONT, :HUP, :WINCH, ] @@ -23,6 +25,7 @@ class Pool attr_reader :workers def initialize(config_loader=nil) + PooledWorker.monkey_patch_resque_worker! init_config(config_loader) @workers = Hash.new { |workers, queues| workers[queues] = {} } procline "(initialized)" @@ -89,7 +92,6 @@ def self.handle_winch=(bool) end def self.kill_other_pools! - require 'resque/pool/killer' Resque::Pool::Killer.run end diff --git a/lib/resque/pool/cli.rb b/lib/resque/pool/cli.rb index f86bcdc..cf1515e 100644 --- a/lib/resque/pool/cli.rb +++ b/lib/resque/pool/cli.rb @@ -1,6 +1,5 @@ -require 'optparse' require 'resque/pool' -require 'resque/pool/logging' +require 'optparse' require 'fileutils' module Resque diff --git a/lib/resque/pool/config_loaders.rb b/lib/resque/pool/config_loaders.rb new file mode 100644 index 0000000..5032db4 --- /dev/null +++ b/lib/resque/pool/config_loaders.rb @@ -0,0 +1,12 @@ +module Resque + class Pool + + # Namespace for various pre-packaged config loaders or loader decorators. + module ConfigLoaders + + autoload :FileOrHashLoader, "resque/pool/config_loaders/file_or_hash_loader" + autoload :Throttled, "resque/pool/config_loaders/throttled" + + end + end +end diff --git a/lib/resque/pool/config_loaders/file_or_hash_loader.rb b/lib/resque/pool/config_loaders/file_or_hash_loader.rb index 5d6b108..7195501 100644 --- a/lib/resque/pool/config_loaders/file_or_hash_loader.rb +++ b/lib/resque/pool/config_loaders/file_or_hash_loader.rb @@ -1,3 +1,6 @@ +require 'erb' +require 'yaml' + module Resque class Pool module ConfigLoaders diff --git a/lib/resque/pool/pooled_worker.rb b/lib/resque/pool/pooled_worker.rb index f1281c8..8c87f80 100644 --- a/lib/resque/pool/pooled_worker.rb +++ b/lib/resque/pool/pooled_worker.rb @@ -33,9 +33,13 @@ def self.included(base) end end - end -end + def self.monkey_patch_resque_worker! + return if @patched_once + Resque::Worker.class_eval do + include Resque::Pool::PooledWorker + end + @patched_once = true + end -Resque::Worker.class_eval do - include Resque::Pool::PooledWorker + end end From 5a04b87b9db4b2d9aa0cd522426c5fdbd1155908 Mon Sep 17 00:00:00 2001 From: "nicholas a. evans" Date: Tue, 27 Oct 2015 21:05:29 -0400 Subject: [PATCH 7/9] spec config cleanup --- spec/resque_pool_spec.rb | 11 ----------- spec/spec_helper.rb | 12 ++++++++++++ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/spec/resque_pool_spec.rb b/spec/resque_pool_spec.rb index 1c1bd56..365188a 100644 --- a/spec/resque_pool_spec.rb +++ b/spec/resque_pool_spec.rb @@ -1,16 +1,5 @@ require 'spec_helper' -RSpec.configure do |config| - config.include PoolSpecHelpers - config.after { - Object.send(:remove_const, :RAILS_ENV) if defined? RAILS_ENV - ENV.delete 'RACK_ENV' - ENV.delete 'RAILS_ENV' - ENV.delete 'RESQUE_ENV' - ENV.delete 'RESQUE_POOL_CONFIG' - } -end - describe Resque::Pool, "when loading a simple pool configuration" do let(:config) do { 'foo' => 1, 'bar' => 2, 'foo,bar' => 3, 'bar,foo' => 4, } diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6613447..2ae2478 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,3 +13,15 @@ def simulate_signal(pool, signal) pool.handle_sig_queue! end end + +RSpec.configure do |config| + config.include PoolSpecHelpers + config.after { + Object.send(:remove_const, :RAILS_ENV) if defined? RAILS_ENV + ENV.delete 'RACK_ENV' + ENV.delete 'RAILS_ENV' + ENV.delete 'RESQUE_ENV' + ENV.delete 'RESQUE_POOL_CONFIG' + } +end + From b93aa18c1c0978926691ac644eb2eff09ddf3f8b Mon Sep 17 00:00:00 2001 From: "nicholas a. evans" Date: Wed, 28 Oct 2015 11:48:03 -0400 Subject: [PATCH 8/9] Add/document Resque::Pool.pool_name and app_name --- lib/resque/pool.rb | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/resque/pool.rb b/lib/resque/pool.rb index 11c2ec6..0438e46 100644 --- a/lib/resque/pool.rb +++ b/lib/resque/pool.rb @@ -78,15 +78,27 @@ def call_#{name}!(*args) # }}} # Config: class methods to start up the pool using the config loader {{{ - class << self; attr_accessor :config_loader, :app_name, :spawn_delay; end + class << self + attr_accessor :config_loader, :app_name, :pool_name, :spawn_delay + end + # Intended to represent the running application/codebase. Should be shared + # from one deploy to the next and across hosts. def self.app_name @app_name ||= File.basename(Dir.pwd) end + # Represents a single running pool. Usually unique per host, so it defaults + # to hostname, but you can set it e.g. to something unique for running + # multiple pools per host. + def self.pool_name + @pool_name ||= Socket.gethostname + end + def self.handle_winch? @handle_winch ||= false end + def self.handle_winch=(bool) @handle_winch = bool end From e8000cd9be082a5c9470aa3e9c31682595eefd67 Mon Sep 17 00:00:00 2001 From: "nicholas a. evans" Date: Wed, 28 Oct 2015 11:49:28 -0400 Subject: [PATCH 9/9] ConfigLoaders::Redis: basic redis-backed configuration Intended to be wrapped by `Throttled` config loader. n.b. if you use this, you will need to reset the redis configuration in your `after_prefork` hook. (Until #135 handles it automatically.) --- lib/resque/pool/config_loaders.rb | 1 + lib/resque/pool/config_loaders/redis.rb | 65 ++++++++++++++ spec/config_loaders/redis_spec.rb | 110 ++++++++++++++++++++++++ 3 files changed, 176 insertions(+) create mode 100644 lib/resque/pool/config_loaders/redis.rb create mode 100644 spec/config_loaders/redis_spec.rb diff --git a/lib/resque/pool/config_loaders.rb b/lib/resque/pool/config_loaders.rb index 5032db4..cfdb6c4 100644 --- a/lib/resque/pool/config_loaders.rb +++ b/lib/resque/pool/config_loaders.rb @@ -5,6 +5,7 @@ class Pool module ConfigLoaders autoload :FileOrHashLoader, "resque/pool/config_loaders/file_or_hash_loader" + autoload :Redis, "resque/pool/config_loaders/redis" autoload :Throttled, "resque/pool/config_loaders/throttled" end diff --git a/lib/resque/pool/config_loaders/redis.rb b/lib/resque/pool/config_loaders/redis.rb new file mode 100644 index 0000000..f54e54b --- /dev/null +++ b/lib/resque/pool/config_loaders/redis.rb @@ -0,0 +1,65 @@ +require "resque" +require "resque/pool" + +module Resque + class Pool + module ConfigLoaders + + # Read/write pool config from redis. + # Should be wrapped in +ConfigLoaders::Throttled+. + # + # n.b. The environment needs to be passed in up-front, and will be ignored + # during +call+. + class Redis + attr_reader :redis + attr_reader :app, :pool, :env, :name + + def initialize(app_name: Pool.app_name, + pool_name: Pool.pool_name, + environment: "unknown", + config_name: "config", + redis: Resque.redis) + @app = app_name + @pool = pool_name + @env = environment + @name = config_name + @redis = redis + end + + # n.b. environment must be set up-front and will be ignored here. + def call(_) + redis.hgetall(key).tap do |h| + h.each do |k,v| + h[k] = v.to_i + end + end + end + + # read individual worker config + def [](worker) + redis.hget(key, worker).to_i + end + + # write individual worker config + def []=(worker, count) + redis.hset(key, worker, count.to_i) + end + + # remove worker config + def delete(worker) + redis.multi do + redis.hget(key, worker) + redis.hdel(key, worker) + end.first.to_i + end + + # n.b. this is probably namespaced under +resque+ + def key + @key ||= ["pool", "config", app, pool, env, name].join(":") + end + + end + + end + end +end diff --git a/spec/config_loaders/redis_spec.rb b/spec/config_loaders/redis_spec.rb new file mode 100644 index 0000000..6713c00 --- /dev/null +++ b/spec/config_loaders/redis_spec.rb @@ -0,0 +1,110 @@ +require 'spec_helper' +require 'resque/pool/config_loaders/redis' + +module Resque::Pool::ConfigLoaders + + describe Redis do + before(:each) do + Resque.redis.flushdb + expect(Resque.redis.keys.count).to eq(0) + end + + after(:all) do + Resque.redis.flushdb + end + + subject(:config) { Redis.new(environment: env) } + subject(:env) { "prd" } + + describe "initialization" do + it "uses default app_name and pool_name from Resque::Pool" do + expect(Redis.new.app).to eq(Resque::Pool.app_name) + expect(Redis.new.pool).to eq(Resque::Pool.pool_name) + end + it "uses default 'unknown' environment" do + expect(Redis.new.env).to eq("unknown") + end + it "uses default 'config' name" do + expect(Redis.new.name).to eq("config") + end + it "constructs redis key (expecting to be namespaced under resque)" do + config = Redis.new(app_name: "foo", + pool_name: "bar", + environment: "dev", + config_name: "override") + expect(config.key).to eq("pool:config:foo:bar:dev:override") + end + it "uses resque's redis connection (probably namespaced)" do + expect(Redis.new.redis).to eq(Resque.redis) + expect(Redis.new(redis: :another).redis).to eq(:another) + end + end + + describe "basic API" do + + it "starts out empty" do + expect(config.call(env)).to eq({}) + end + + it "has hash-like index setters" do + config["foo"] = 2 + config["bar"] = 3 + config["numbers_only"] = "elephant" + expect(config.call(env)).to eq({ + "foo" => 2, + "bar" => 3, + "numbers_only" => 0, + }) + end + + it "has indifferent access (but call returns string keys)" do + config[:foo] = 1 + config["foo"] = 2 + expect(config[:foo]).to eq(2) + expect(config.call(env)).to eq("foo" => 2) + end + + it "has hash-like index getters" do + config["foo"] = 86 + config["bar"] = 99 + expect(config["foo"]).to eq(86) + expect(config["bar"]).to eq(99) + expect(config["nonexistent"]).to eq(0) + end + + it "can remove keys (not just set them to zero)" do + config["foo"] = 99 + config["bar"] = 7 + expect(config.delete("foo")).to eq(99) + expect(config.call(env)).to eq("bar" => 7) + end + + end + + describe "persistance" do + + it "can be loaded from another instance" do + config["qA"] = 24 + config["qB"] = 33 + config2 = Redis.new environment: env + expect(config2.call(env)).to eq("qA" => 24, "qB" => 33) + end + + it "won't clash with different configs" do + config[:foo] = 1 + config[:bar] = 2 + config2 = Redis.new app_name: "another" + expect(config2.call(env)).to eq({}) + config3 = Redis.new pool_name: "another" + expect(config3.call(env)).to eq({}) + config4 = Redis.new config_name: "another" + expect(config4.call(env)).to eq({}) + config5 = Redis.new environment: "another" + expect(config5.call(env)).to eq({}) + end + + end + + end + +end