diff --git a/lib/sqewer.rb b/lib/sqewer.rb index dfe5bbe..0027bc9 100644 --- a/lib/sqewer.rb +++ b/lib/sqewer.rb @@ -1,3 +1,5 @@ +require 'connection_pool' + # The enclosing module for the library module Sqewer # Eager-load everything except extensions. Sort to ensure @@ -24,6 +26,18 @@ def self.submit!(*jobs, **options) Sqewer::Submitter.default.submit!(*jobs, **options) end + def self.default_connection_pool + @default_connection_pool ||= ConnectionPool.new do + Sqewer::Connection.default + end + end + + def self.reset_default_connection_pool! + @default_connection_pool = ConnectionPool.new do + Sqewer::Connection.default + end + end + # If we are within Rails, load the railtie require_relative 'sqewer/extensions/railtie' if defined?(Rails) diff --git a/lib/sqewer/connection_messagebox.rb b/lib/sqewer/connection_messagebox.rb index 0b562ec..520790c 100644 --- a/lib/sqewer/connection_messagebox.rb +++ b/lib/sqewer/connection_messagebox.rb @@ -17,8 +17,8 @@ # a messagebox will be able to buffer those sends and pack them in batches, # consequently performing less requests class Sqewer::ConnectionMessagebox - def initialize(connection) - @connection = connection + def initialize(connection_pool) + @connection_pool = connection_pool @deletes = [] @sends = [] @mux = Mutex.new @@ -49,14 +49,16 @@ def delete_message(message_identifier) # All of those will use batching where possible. def flush! @mux.synchronize do - @connection.send_multiple_messages do | buffer | - @sends.each { |body, kwargs| buffer.send_message(body, **kwargs) } - end + @connection_pool.with do |connection| + connection.send_multiple_messages do | buffer | + @sends.each { |body, kwargs| buffer.send_message(body, **kwargs) } + end - @connection.delete_multiple_messages do | buffer | - @deletes.each { |id| buffer.delete_message(id) } + connection.delete_multiple_messages do | buffer | + @deletes.each { |id| buffer.delete_message(id) } + end + (@sends.length + @deletes.length).tap{ @sends.clear; @deletes.clear } end - (@sends.length + @deletes.length).tap{ @sends.clear; @deletes.clear } end end end diff --git a/lib/sqewer/submitter.rb b/lib/sqewer/submitter.rb index c37788e..ec4a4a7 100644 --- a/lib/sqewer/submitter.rb +++ b/lib/sqewer/submitter.rb @@ -2,13 +2,13 @@ # (something that responds to `#send_message`) # and the serializer (something that responds to `#serialize`) to # convert the job into the string that will be put in the queue. -class Sqewer::Submitter < Struct.new(:connection, :serializer) +class Sqewer::Submitter < Struct.new(:connection_pool, :serializer) NotSqewerJob = Class.new(StandardError) # Returns a default Submitter, configured with the default connection # and the default serializer. def self.default - new(Sqewer::Connection.default, Sqewer::Serializer.default) + new(Sqewer.default_connection_pool, Sqewer::Serializer.default) end def submit!(job, **kwargs_for_send) @@ -21,7 +21,13 @@ def submit!(job, **kwargs_for_send) else serializer.serialize(job) end - connection.send_message(message_body, **kwargs_for_send) + if connection_pool.is_a?(Sqewer::ConnectionMessagebox) + connection_pool.send_message(message_body, **kwargs_for_send) + else + connection_pool.with do |connection| + connection.send_message(message_body, **kwargs_for_send) + end + end end private diff --git a/lib/sqewer/worker.rb b/lib/sqewer/worker.rb index 8b8e204..e880618 100644 --- a/lib/sqewer/worker.rb +++ b/lib/sqewer/worker.rb @@ -12,8 +12,8 @@ class Sqewer::Worker # @return [Logger] The logger used for job execution attr_reader :logger - # @return [Sqewer::Connection] The connection for sending and receiving messages - attr_reader :connection + # @return [ConnectionPool] The pool of connections for sending and receiving messages + attr_reader :connection_pool # @return [Sqewer::Serializer] The serializer for unmarshalling and marshalling attr_reader :serializer @@ -46,15 +46,15 @@ def self.default # Creates a new Worker. The Worker, unlike it is in the Rails tradition, is only responsible for # the actual processing of jobs, and not for the job arguments. # - # @param connection[Sqewer::Connection] the object that handles polling and submitting + # @param connection_pool[ConnectionPool] pool of objects that handle polling and submitting # @param serializer[#serialize, #unserialize] the serializer/unserializer for the jobs - # @param execution_context_class[Class] the class for the execution context (will be instantiated by + # @param execution_context_class[Class] the class for the execution context (will be instantiated by # the worker for each job execution) # @param submitter_class[Class] the class used for submitting jobs (will be instantiated by the worker for each job execution) # @param middleware_stack[Sqewer::MiddlewareStack] the middleware stack that is going to be used # @param logger[Logger] the logger to log execution to and to pass to the jobs # @param num_threads[Fixnum] how many worker threads to spawn - def initialize(connection: Sqewer::Connection.default, + def initialize(connection_pool: Sqewer.default_connection_pool, serializer: Sqewer::Serializer.default, execution_context_class: Sqewer::ExecutionContext, submitter_class: Sqewer::Submitter, @@ -63,7 +63,7 @@ def initialize(connection: Sqewer::Connection.default, num_threads: DEFAULT_NUM_THREADS) @logger = logger - @connection = connection + @connection_pool = connection_pool @serializer = serializer @middleware_stack = middleware_stack @execution_context_class = execution_context_class @@ -104,7 +104,9 @@ def start break if stopping? if queue_has_capacity? - messages = @connection.receive_messages + messages = connection_pool.with do |connection| + connection.receive_messages + end if messages.any? messages.each {|m| @execution_queue << m } @logger.debug { "[worker] Received and buffered %d messages" % messages.length } if messages.any? @@ -201,7 +203,7 @@ def handle_message(message) # we can send out those commands in one go (without interfering with senders # on other threads, as it seems the Aws::SQS::Client is not entirely # thread-safe - or at least not it's HTTP client part). - box = Sqewer::ConnectionMessagebox.new(connection) + box = Sqewer::ConnectionMessagebox.new(connection_pool) return box.delete_message(message.receipt_handle) unless message.has_body? job = middleware_stack.around_deserialization(serializer, message.receipt_handle, message.body, message.attributes) do @@ -241,7 +243,7 @@ def perform(message) # we can send out those commands in one go (without interfering with senders # on other threads, as it seems the Aws::SQS::Client is not entirely # thread-safe - or at least not it's HTTP client part). - box = Sqewer::ConnectionMessagebox.new(connection) + box = Sqewer::ConnectionMessagebox.new(connection_pool) job = middleware_stack.around_deserialization(serializer, message.receipt_handle, message.body, message.attributes) do serializer.unserialize(message.body) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c04ce82..54846b8 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -28,6 +28,7 @@ client.delete_queue(queue_url: ENV.fetch('SQS_QUEUE_URL')) rescue Aws::SQS::Errors::NonExistentQueue ENV.delete('SQS_QUEUE_URL') + Sqewer.reset_default_connection_pool! else example.run end diff --git a/spec/sqewer/submitter_spec.rb b/spec/sqewer/submitter_spec.rb index ba96d11..d6c122b 100644 --- a/spec/sqewer/submitter_spec.rb +++ b/spec/sqewer/submitter_spec.rb @@ -3,10 +3,10 @@ describe Sqewer::Submitter do describe '.default' do it 'returns a set up Submitter with the configured Connection and Serializer' do - expect(ENV).to receive(:fetch).with('SQS_QUEUE_URL').and_return('https://some-queue.aws.com') + # expect(ENV).to receive(:fetch).with('SQS_QUEUE_URL').and_return('https://some-queue.aws.com') s = described_class.default - expect(s.connection).to respond_to(:send_message) + expect(s.connection_pool).to respond_to(:with) expect(s.serializer).to respond_to(:serialize) end end @@ -21,10 +21,12 @@ fake_connection = double('Some SQS connection') expect(fake_connection).to receive(:send_message).at_least(5).times.with('serialized-object-data', any_args) + fake_connection_pool = double('Fake pool') + allow(fake_connection_pool).to receive(:with).and_yield(fake_connection) fake_job = double('Some job', run: true) - subject = described_class.new(fake_connection, fake_serializer) + subject = described_class.new(fake_connection_pool, fake_serializer) 5.times { subject.submit!(fake_job) } end @@ -37,10 +39,12 @@ fake_connection = double('Some SQS connection') expect(fake_connection).to receive(:send_message).with('serialized-object-data', {delay_seconds: 5}) + fake_connection_pool = double('Fake pool') + allow(fake_connection_pool).to receive(:with).and_yield(fake_connection) fake_job = double('Some job', run: true) - subject = described_class.new(fake_connection, fake_serializer) + subject = described_class.new(fake_connection_pool, fake_serializer) subject.submit!(fake_job, delay_seconds: 5) end @@ -57,19 +61,23 @@ fake_connection = double('Some SQS connection') expect(fake_connection).to receive(:send_message).with('serialized-object-data', {delay_seconds: 899}) + fake_connection_pool = double('Fake pool') + allow(fake_connection_pool).to receive(:with).and_yield(fake_connection) fake_job = double('Some job', run: true) - subject = described_class.new(fake_connection, fake_serializer) + subject = described_class.new(fake_connection_pool, fake_serializer) subject.submit!(fake_job, delay_seconds: 4585659855) end it "raises an error if the job does not respond to call" do fake_serializer = double('Some serializer') fake_connection = double('Some SQS connection') + fake_connection_pool = double('Fake pool') + allow(fake_connection_pool).to receive(:with).and_yield(fake_connection) fake_job = double('Some job') - subject = described_class.new(fake_connection, fake_serializer) + subject = described_class.new(fake_connection_pool, fake_serializer) expect { subject.submit!(fake_job, delay_seconds: 5) }.to raise_error(Sqewer::Submitter::NotSqewerJob) diff --git a/spec/sqewer/worker_spec.rb b/spec/sqewer/worker_spec.rb index d98554b..602430f 100644 --- a/spec/sqewer/worker_spec.rb +++ b/spec/sqewer/worker_spec.rb @@ -7,7 +7,7 @@ } it 'has all the necessary attributes' do - attrs = [:logger, :connection, :serializer, :middleware_stack, + attrs = [:logger, :connection_pool, :serializer, :middleware_stack, :execution_context_class, :submitter_class, :num_threads] default_worker = described_class.default attrs.each do | attr_name | @@ -53,7 +53,9 @@ allow(fake_conn).to receive(:receive_messages) { loop { sleep 0.5 } } - worker = described_class.new(logger: test_logger, connection: fake_conn) + fake_conn_pool = double('Fake pool') + allow(fake_conn_pool).to receive(:with).and_yield(fake_conn) + worker = described_class.new(logger: test_logger, connection_pool: fake_conn_pool) worker.start sleep 2 worker.stop @@ -64,9 +66,11 @@ it 'it stops itself gracefully' do fake_conn = double('bad connection') allow(fake_conn).to receive(:receive_messages).and_raise("boom") + fake_conn_pool = double('Fake pool') + allow(fake_conn_pool).to receive(:with).and_yield(fake_conn) log_device = StringIO.new('') - worker = described_class.new(logger: Logger.new(log_device), connection: fake_conn) + worker = described_class.new(logger: Logger.new(log_device), connection_pool: fake_conn_pool) expect(worker).to receive(:stop).at_least(:once) expect(worker).not_to receive(:kill) diff --git a/sqewer.gemspec b/sqewer.gemspec index bed86b5..5b602c9 100644 --- a/sqewer.gemspec +++ b/sqewer.gemspec @@ -30,6 +30,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_runtime_dependency "aws-sdk-sqs", "~> 1" + spec.add_runtime_dependency "connection_pool" spec.add_runtime_dependency "rack" spec.add_runtime_dependency "very_tiny_state_machine" spec.add_runtime_dependency "ks"