diff --git a/README.md b/README.md index cd29078..a1d14ad 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,24 @@ returns. It can optionally implement a `#reset!` method, which will be invoked when the HUP signal is received, allowing the loader to flush its cache, or perform any other re-initialization. +You can reduce the frequency that your configuration loader is called by +wrapping it with `Resque::Pool::ConfigThrottle` and specifying a time (in seconds) +to cache the previous value (see [the spec](spec/config_throttle_spec.rb) for +more details): + +```ruby +task "resque:pool:setup" do + redis_loader = lambda do |env| + worker_count = Redis.current.get("pool_workers_#{env}").to_i + {"queueA,queueB" => worker_count } + end + + # calls through to redis_loader at most once per 10 seconds + Resque::Pool.config_loader = Resque::Pool::ConfigThrottle(10, redis_loader) +end +``` + + Zero-downtime code deploys -------------------------- diff --git a/bin/resque-pool b/bin/resque-pool index c3876c0..e6dfbb8 100755 --- a/bin/resque-pool +++ b/bin/resque-pool @@ -3,5 +3,5 @@ $LOAD_PATH.unshift File.expand_path('../../lib', __FILE__) -require 'resque/pool/cli' +require 'resque/pool' Resque::Pool::CLI.run diff --git a/lib/resque/pool.rb b/lib/resque/pool.rb index 5c99b47..0438e46 100644 --- a/lib/resque/pool.rb +++ b/lib/resque/pool.rb @@ -1,16 +1,18 @@ # -*- encoding: utf-8 -*- require 'resque' require 'resque/worker' -require 'resque/pool/version' -require 'resque/pool/logging' -require 'resque/pool/pooled_worker' -require 'resque/pool/file_or_hash_loader' -require 'erb' require 'fcntl' -require 'yaml' +require 'socket' module Resque class Pool + autoload :CLI, "resque/pool/cli" + autoload :ConfigLoaders, "resque/pool/config_loaders" + autoload :Killer, "resque/pool/killer" + autoload :Logging, "resque/pool/logging" + autoload :PooledWorker, "resque/pool/pooled_worker" + autoload :VERSION, "resque/pool/version" + SIG_QUEUE_MAX_SIZE = 5 DEFAULT_WORKER_INTERVAL = 5 QUEUE_SIGS = [ :QUIT, :INT, :TERM, :USR1, :USR2, :CONT, :HUP, :WINCH, ] @@ -23,6 +25,7 @@ class Pool attr_reader :workers def initialize(config_loader=nil) + PooledWorker.monkey_patch_resque_worker! init_config(config_loader) @workers = Hash.new { |workers, queues| workers[queues] = {} } procline "(initialized)" @@ -75,21 +78,32 @@ def call_#{name}!(*args) # }}} # Config: class methods to start up the pool using the config loader {{{ - class << self; attr_accessor :config_loader, :app_name, :spawn_delay; end + class << self + attr_accessor :config_loader, :app_name, :pool_name, :spawn_delay + end + # Intended to represent the running application/codebase. Should be shared + # from one deploy to the next and across hosts. def self.app_name @app_name ||= File.basename(Dir.pwd) end + # Represents a single running pool. Usually unique per host, so it defaults + # to hostname, but you can set it e.g. to something unique for running + # multiple pools per host. + def self.pool_name + @pool_name ||= Socket.gethostname + end + def self.handle_winch? @handle_winch ||= false end + def self.handle_winch=(bool) @handle_winch = bool end def self.kill_other_pools! - require 'resque/pool/killer' Resque::Pool::Killer.run end @@ -119,7 +133,7 @@ def self.create_configured def init_config(loader) case loader when String, Hash, nil - @config_loader = FileOrHashLoader.new(loader) + @config_loader = ConfigLoaders::FileOrHashLoader.new(loader) else @config_loader = loader end diff --git a/lib/resque/pool/cli.rb b/lib/resque/pool/cli.rb index f86bcdc..cf1515e 100644 --- a/lib/resque/pool/cli.rb +++ b/lib/resque/pool/cli.rb @@ -1,6 +1,5 @@ -require 'optparse' require 'resque/pool' -require 'resque/pool/logging' +require 'optparse' require 'fileutils' module Resque diff --git a/lib/resque/pool/config_loaders.rb b/lib/resque/pool/config_loaders.rb new file mode 100644 index 0000000..cfdb6c4 --- /dev/null +++ b/lib/resque/pool/config_loaders.rb @@ -0,0 +1,13 @@ +module Resque + class Pool + + # Namespace for various pre-packaged config loaders or loader decorators. + module ConfigLoaders + + autoload :FileOrHashLoader, "resque/pool/config_loaders/file_or_hash_loader" + autoload :Redis, "resque/pool/config_loaders/redis" + autoload :Throttled, "resque/pool/config_loaders/throttled" + + end + end +end diff --git a/lib/resque/pool/config_loaders/file_or_hash_loader.rb b/lib/resque/pool/config_loaders/file_or_hash_loader.rb new file mode 100644 index 0000000..7195501 --- /dev/null +++ b/lib/resque/pool/config_loaders/file_or_hash_loader.rb @@ -0,0 +1,66 @@ +require 'erb' +require 'yaml' + +module Resque + class Pool + module ConfigLoaders + + class FileOrHashLoader + def initialize(filename_or_hash=nil) + case filename_or_hash + when String, nil + @filename = filename_or_hash + when Hash + @static_config = filename_or_hash.dup + else + raise "#{self.class} cannot be initialized with #{filename_or_hash.inspect}" + end + end + + def call(environment) + @config ||= load_config_from_file(environment) + end + + def reset! + @config = nil + end + + private + + def load_config_from_file(environment) + if @static_config + new_config = @static_config + else + filename = config_filename + new_config = load_config filename + end + apply_environment new_config, environment + end + + def apply_environment(config, environment) + environment and config[environment] and config.merge!(config[environment]) + config.delete_if {|key, value| value.is_a? Hash } + end + + def config_filename + @filename || choose_config_file + end + + def load_config(filename) + return {} unless filename + YAML.load(ERB.new(IO.read(filename)).result) + end + + CONFIG_FILES = ["resque-pool.yml", "config/resque-pool.yml"] + def choose_config_file + if ENV["RESQUE_POOL_CONFIG"] + ENV["RESQUE_POOL_CONFIG"] + else + CONFIG_FILES.detect { |f| File.exist?(f) } + end + end + end + + end + end +end diff --git a/lib/resque/pool/config_loaders/redis.rb b/lib/resque/pool/config_loaders/redis.rb new file mode 100644 index 0000000..f54e54b --- /dev/null +++ b/lib/resque/pool/config_loaders/redis.rb @@ -0,0 +1,65 @@ +require "resque" +require "resque/pool" + +module Resque + class Pool + module ConfigLoaders + + # Read/write pool config from redis. + # Should be wrapped in +ConfigLoaders::Throttled+. + # + # n.b. The environment needs to be passed in up-front, and will be ignored + # during +call+. + class Redis + attr_reader :redis + attr_reader :app, :pool, :env, :name + + def initialize(app_name: Pool.app_name, + pool_name: Pool.pool_name, + environment: "unknown", + config_name: "config", + redis: Resque.redis) + @app = app_name + @pool = pool_name + @env = environment + @name = config_name + @redis = redis + end + + # n.b. environment must be set up-front and will be ignored here. + def call(_) + redis.hgetall(key).tap do |h| + h.each do |k,v| + h[k] = v.to_i + end + end + end + + # read individual worker config + def [](worker) + redis.hget(key, worker).to_i + end + + # write individual worker config + def []=(worker, count) + redis.hset(key, worker, count.to_i) + end + + # remove worker config + def delete(worker) + redis.multi do + redis.hget(key, worker) + redis.hdel(key, worker) + end.first.to_i + end + + # n.b. this is probably namespaced under +resque+ + def key + @key ||= ["pool", "config", app, pool, env, name].join(":") + end + + end + + end + end +end diff --git a/lib/resque/pool/config_loaders/throttled.rb b/lib/resque/pool/config_loaders/throttled.rb new file mode 100644 index 0000000..4e535e4 --- /dev/null +++ b/lib/resque/pool/config_loaders/throttled.rb @@ -0,0 +1,44 @@ +require "delegate" + +module Resque + class Pool + + module ConfigLoaders + + # Throttle the frequency of loading pool configuration + # Defaults to call only once per 10 seconds. + class Throttled < SimpleDelegator + + def initialize(config_loader, period: 10, time_source: Time) + super(config_loader) + @period = period + @resettable = config_loader.respond_to?(:reset!) + @last_check = 0 + @time_source = time_source + end + + def call(env) + # We do not need to cache per `env`, since the value of `env` will not + # change during the life of the process. + if (now > @last_check + @period) + @cache = super + @last_check = now + end + @cache + end + + def reset! + @last_check = 0 + super if @resettable + end + + private + + def now + @time_source.now.to_f + end + end + + end + end +end diff --git a/lib/resque/pool/file_or_hash_loader.rb b/lib/resque/pool/file_or_hash_loader.rb deleted file mode 100644 index f6c55b6..0000000 --- a/lib/resque/pool/file_or_hash_loader.rb +++ /dev/null @@ -1,59 +0,0 @@ -module Resque - class Pool - class FileOrHashLoader - def initialize(filename_or_hash=nil) - case filename_or_hash - when String, nil - @filename = filename_or_hash - when Hash - @static_config = filename_or_hash.dup - else - raise "#{self.class} cannot be initialized with #{filename_or_hash.inspect}" - end - end - - def call(environment) - @config ||= load_config_from_file(environment) - end - - def reset! - @config = nil - end - - private - - def load_config_from_file(environment) - if @static_config - new_config = @static_config - else - filename = config_filename - new_config = load_config filename - end - apply_environment new_config, environment - end - - def apply_environment(config, environment) - environment and config[environment] and config.merge!(config[environment]) - config.delete_if {|key, value| value.is_a? Hash } - end - - def config_filename - @filename || choose_config_file - end - - def load_config(filename) - return {} unless filename - YAML.load(ERB.new(IO.read(filename)).result) - end - - CONFIG_FILES = ["resque-pool.yml", "config/resque-pool.yml"] - def choose_config_file - if ENV["RESQUE_POOL_CONFIG"] - ENV["RESQUE_POOL_CONFIG"] - else - CONFIG_FILES.detect { |f| File.exist?(f) } - end - end - end - end -end diff --git a/lib/resque/pool/pooled_worker.rb b/lib/resque/pool/pooled_worker.rb index f1281c8..8c87f80 100644 --- a/lib/resque/pool/pooled_worker.rb +++ b/lib/resque/pool/pooled_worker.rb @@ -33,9 +33,13 @@ def self.included(base) end end - end -end + def self.monkey_patch_resque_worker! + return if @patched_once + Resque::Worker.class_eval do + include Resque::Pool::PooledWorker + end + @patched_once = true + end -Resque::Worker.class_eval do - include Resque::Pool::PooledWorker + end end diff --git a/spec/config_loaders/redis_spec.rb b/spec/config_loaders/redis_spec.rb new file mode 100644 index 0000000..6713c00 --- /dev/null +++ b/spec/config_loaders/redis_spec.rb @@ -0,0 +1,110 @@ +require 'spec_helper' +require 'resque/pool/config_loaders/redis' + +module Resque::Pool::ConfigLoaders + + describe Redis do + before(:each) do + Resque.redis.flushdb + expect(Resque.redis.keys.count).to eq(0) + end + + after(:all) do + Resque.redis.flushdb + end + + subject(:config) { Redis.new(environment: env) } + subject(:env) { "prd" } + + describe "initialization" do + it "uses default app_name and pool_name from Resque::Pool" do + expect(Redis.new.app).to eq(Resque::Pool.app_name) + expect(Redis.new.pool).to eq(Resque::Pool.pool_name) + end + it "uses default 'unknown' environment" do + expect(Redis.new.env).to eq("unknown") + end + it "uses default 'config' name" do + expect(Redis.new.name).to eq("config") + end + it "constructs redis key (expecting to be namespaced under resque)" do + config = Redis.new(app_name: "foo", + pool_name: "bar", + environment: "dev", + config_name: "override") + expect(config.key).to eq("pool:config:foo:bar:dev:override") + end + it "uses resque's redis connection (probably namespaced)" do + expect(Redis.new.redis).to eq(Resque.redis) + expect(Redis.new(redis: :another).redis).to eq(:another) + end + end + + describe "basic API" do + + it "starts out empty" do + expect(config.call(env)).to eq({}) + end + + it "has hash-like index setters" do + config["foo"] = 2 + config["bar"] = 3 + config["numbers_only"] = "elephant" + expect(config.call(env)).to eq({ + "foo" => 2, + "bar" => 3, + "numbers_only" => 0, + }) + end + + it "has indifferent access (but call returns string keys)" do + config[:foo] = 1 + config["foo"] = 2 + expect(config[:foo]).to eq(2) + expect(config.call(env)).to eq("foo" => 2) + end + + it "has hash-like index getters" do + config["foo"] = 86 + config["bar"] = 99 + expect(config["foo"]).to eq(86) + expect(config["bar"]).to eq(99) + expect(config["nonexistent"]).to eq(0) + end + + it "can remove keys (not just set them to zero)" do + config["foo"] = 99 + config["bar"] = 7 + expect(config.delete("foo")).to eq(99) + expect(config.call(env)).to eq("bar" => 7) + end + + end + + describe "persistance" do + + it "can be loaded from another instance" do + config["qA"] = 24 + config["qB"] = 33 + config2 = Redis.new environment: env + expect(config2.call(env)).to eq("qA" => 24, "qB" => 33) + end + + it "won't clash with different configs" do + config[:foo] = 1 + config[:bar] = 2 + config2 = Redis.new app_name: "another" + expect(config2.call(env)).to eq({}) + config3 = Redis.new pool_name: "another" + expect(config3.call(env)).to eq({}) + config4 = Redis.new config_name: "another" + expect(config4.call(env)).to eq({}) + config5 = Redis.new environment: "another" + expect(config5.call(env)).to eq({}) + end + + end + + end + +end diff --git a/spec/config_loaders/throttled_spec.rb b/spec/config_loaders/throttled_spec.rb new file mode 100644 index 0000000..2f2bf12 --- /dev/null +++ b/spec/config_loaders/throttled_spec.rb @@ -0,0 +1,147 @@ +require 'spec_helper' +require 'resque/pool/config_loaders/throttled' + +module Resque::Pool::ConfigLoaders + + describe Throttled do + let(:fake_time) { FakeTime.new 1445898807 } + + it "returns the config returned by the wrapped config loader for given env" do + wrapped_config = { + "dev" => {"qA,qB" => 1}, + "prd" => {"qA,qB" => 4} + } + wrapped_loader = lambda {|env| wrapped_config[env] } + throttle = Throttled.new(wrapped_loader) + + throttle.call("prd").should eq({"qA,qB" => 4}) + end + + it "does not call wrapped loader again until the default period of time has elapsed" do + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = {"qA,qB" => 1} + + throttle = Throttled.new(wrapped_loader, time_source: fake_time) + first_call = throttle.call("prd") + + new_config = {"qA,qB" => 22} + wrapped_loader.configuration = new_config + fake_time.advance_time 6 + # config changed, but not enough time elapsed + + second_call = throttle.call("prd") + + second_call.should eq(first_call) + wrapped_loader.times_called.should == 1 + + fake_time.advance_time 6 + # now, enough time has elapsed to retrieve latest config + + third_call = throttle.call("prd") + + third_call.should_not eq(first_call) + third_call.should eq(new_config) + wrapped_loader.times_called.should == 2 + + # further calls continue to use cached value + throttle.call("prd") + throttle.call("prd") + throttle.call("prd") + wrapped_loader.times_called.should == 2 + end + + it "can specify an alternate cache period" do + config0 = {foo: 2, bar: 1} + config1 = {bar: 3, baz: 9} + config2 = {foo: 4, quux: 1} + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = config0 + throttle = Throttled.new( + wrapped_loader, period: 60, time_source: fake_time + ) + throttle.call("prd").should eq(config0) + wrapped_loader.configuration = config1 + fake_time.advance_time 59 + throttle.call("prd").should eq(config0) + fake_time.advance_time 5 + throttle.call("prd").should eq(config1) + wrapped_loader.configuration = config2 + fake_time.advance_time 59 + throttle.call("prd").should eq(config1) + fake_time.advance_time 2 + throttle.call("prd").should eq(config2) + end + + it "forces a call to the wrapperd loader after reset! called, even if required time hasn't elapsed" do + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = {"qA,qB" => 1} + + throttle = Throttled.new(wrapped_loader, time_source: fake_time) + throttle.call("prd") + + new_config = {"qA,qB" => 22} + wrapped_loader.configuration = new_config + fake_time.advance_time 6 + # the 10 second period has *not* elapsed + + throttle.reset! + + second_call = throttle.call("prd") + + second_call.should eq(new_config) + wrapped_loader.times_called.should == 2 + end + + it "delegates reset! to the wrapped_loader, when supported" do + wrapped_loader = TestConfigLoader.new + throttle = Throttled.new(wrapped_loader) + + wrapped_loader.times_reset.should == 0 + throttle.reset! + wrapped_loader.times_reset.should == 1 + end + + it "does not delegate reset! to the wrapped_loader when it is not supported" do + wrapped_loader = lambda {|env| Hash.new } + throttle = Throttled.new(wrapped_loader) + + expect { + throttle.reset! + }.to_not raise_error + end + + class TestConfigLoader + attr_accessor :configuration + attr_reader :times_called + attr_reader :times_reset + + def initialize + @times_called = 0 + @times_reset = 0 + end + + def call(env) + @times_called += 1 + configuration + end + + def reset! + @times_reset += 1 + end + end + + class FakeTime + attr_reader :now + + def initialize(start_time) + @now = start_time + end + + def advance_time(seconds) + @now += seconds + end + end + + end + +end diff --git a/spec/resque_pool_spec.rb b/spec/resque_pool_spec.rb index 405c345..365188a 100644 --- a/spec/resque_pool_spec.rb +++ b/spec/resque_pool_spec.rb @@ -1,16 +1,5 @@ require 'spec_helper' -RSpec.configure do |config| - config.include PoolSpecHelpers - config.after { - Object.send(:remove_const, :RAILS_ENV) if defined? RAILS_ENV - ENV.delete 'RACK_ENV' - ENV.delete 'RAILS_ENV' - ENV.delete 'RESQUE_ENV' - ENV.delete 'RESQUE_POOL_CONFIG' - } -end - describe Resque::Pool, "when loading a simple pool configuration" do let(:config) do { 'foo' => 1, 'bar' => 2, 'foo,bar' => 3, 'bar,foo' => 4, } @@ -248,7 +237,9 @@ module Rails; end subject { Resque::Pool.create_configured } it "created pools use config file and hash loading logic" do - subject.config_loader.should be_instance_of Resque::Pool::FileOrHashLoader + subject.config_loader.should be_instance_of( + Resque::Pool::ConfigLoaders::FileOrHashLoader + ) end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6613447..2ae2478 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,3 +13,15 @@ def simulate_signal(pool, signal) pool.handle_sig_queue! end end + +RSpec.configure do |config| + config.include PoolSpecHelpers + config.after { + Object.send(:remove_const, :RAILS_ENV) if defined? RAILS_ENV + ENV.delete 'RACK_ENV' + ENV.delete 'RAILS_ENV' + ENV.delete 'RESQUE_ENV' + ENV.delete 'RESQUE_POOL_CONFIG' + } +end +