From c40ff4f1c89f5690a1855f09a577505692cc0e7c Mon Sep 17 00:00:00 2001 From: Andrei Horak Date: Fri, 24 Jan 2020 12:07:38 +0100 Subject: [PATCH 1/6] Initial attempt at connection pooling --- lib/sqewer.rb | 8 ++++++++ lib/sqewer/connection_messagebox.rb | 18 ++++++++++-------- lib/sqewer/submitter.rb | 8 +++++--- lib/sqewer/worker.rb | 16 +++++++++------- spec/sqewer/submitter_spec.rb | 20 ++++++++++++++------ spec/sqewer/worker_spec.rb | 10 +++++++--- sqewer.gemspec | 1 + 7 files changed, 54 insertions(+), 27 deletions(-) diff --git a/lib/sqewer.rb b/lib/sqewer.rb index dfe5bbe..17c3744 100644 --- a/lib/sqewer.rb +++ b/lib/sqewer.rb @@ -1,3 +1,5 @@ +require 'connection_pool' + # The enclosing module for the library module Sqewer # Eager-load everything except extensions. Sort to ensure @@ -24,6 +26,12 @@ def self.submit!(*jobs, **options) Sqewer::Submitter.default.submit!(*jobs, **options) end + def self.connection_pool + @connection_pool ||= ConnectionPool.new(timeout: 5, size: 20) do + Sqewer::Connection.default + end + end + # If we are within Rails, load the railtie require_relative 'sqewer/extensions/railtie' if defined?(Rails) diff --git a/lib/sqewer/connection_messagebox.rb b/lib/sqewer/connection_messagebox.rb index 0b562ec..520790c 100644 --- a/lib/sqewer/connection_messagebox.rb +++ b/lib/sqewer/connection_messagebox.rb @@ -17,8 +17,8 @@ # a messagebox will be able to buffer those sends and pack them in batches, # consequently performing less requests class Sqewer::ConnectionMessagebox - def initialize(connection) - @connection = connection + def initialize(connection_pool) + @connection_pool = connection_pool @deletes = [] @sends = [] @mux = Mutex.new @@ -49,14 +49,16 @@ def delete_message(message_identifier) # All of those will use batching where possible. def flush! @mux.synchronize do - @connection.send_multiple_messages do | buffer | - @sends.each { |body, kwargs| buffer.send_message(body, **kwargs) } - end + @connection_pool.with do |connection| + connection.send_multiple_messages do | buffer | + @sends.each { |body, kwargs| buffer.send_message(body, **kwargs) } + end - @connection.delete_multiple_messages do | buffer | - @deletes.each { |id| buffer.delete_message(id) } + connection.delete_multiple_messages do | buffer | + @deletes.each { |id| buffer.delete_message(id) } + end + (@sends.length + @deletes.length).tap{ @sends.clear; @deletes.clear } end - (@sends.length + @deletes.length).tap{ @sends.clear; @deletes.clear } end end end diff --git a/lib/sqewer/submitter.rb b/lib/sqewer/submitter.rb index c37788e..7ff8cb0 100644 --- a/lib/sqewer/submitter.rb +++ b/lib/sqewer/submitter.rb @@ -2,13 +2,13 @@ # (something that responds to `#send_message`) # and the serializer (something that responds to `#serialize`) to # convert the job into the string that will be put in the queue. -class Sqewer::Submitter < Struct.new(:connection, :serializer) +class Sqewer::Submitter < Struct.new(:connection_pool, :serializer) NotSqewerJob = Class.new(StandardError) # Returns a default Submitter, configured with the default connection # and the default serializer. def self.default - new(Sqewer::Connection.default, Sqewer::Serializer.default) + new(Sqewer.connection_pool, Sqewer::Serializer.default) end def submit!(job, **kwargs_for_send) @@ -21,7 +21,9 @@ def submit!(job, **kwargs_for_send) else serializer.serialize(job) end - connection.send_message(message_body, **kwargs_for_send) + connection_pool.with do |connection| + connection.send_message(message_body, **kwargs_for_send) + end end private diff --git a/lib/sqewer/worker.rb b/lib/sqewer/worker.rb index 8b8e204..ffd3df4 100644 --- a/lib/sqewer/worker.rb +++ b/lib/sqewer/worker.rb @@ -13,7 +13,7 @@ class Sqewer::Worker attr_reader :logger # @return [Sqewer::Connection] The connection for sending and receiving messages - attr_reader :connection + attr_reader :connection_pool # @return [Sqewer::Serializer] The serializer for unmarshalling and marshalling attr_reader :serializer @@ -48,13 +48,13 @@ def self.default # # @param connection[Sqewer::Connection] the object that handles polling and submitting # @param serializer[#serialize, #unserialize] the serializer/unserializer for the jobs - # @param execution_context_class[Class] the class for the execution context (will be instantiated by + # @param execution_context_class[Class] the class for the execution context (will be instantiated by # the worker for each job execution) # @param submitter_class[Class] the class used for submitting jobs (will be instantiated by the worker for each job execution) # @param middleware_stack[Sqewer::MiddlewareStack] the middleware stack that is going to be used # @param logger[Logger] the logger to log execution to and to pass to the jobs # @param num_threads[Fixnum] how many worker threads to spawn - def initialize(connection: Sqewer::Connection.default, + def initialize(connection_pool: Sqewer.connection_pool, serializer: Sqewer::Serializer.default, execution_context_class: Sqewer::ExecutionContext, submitter_class: Sqewer::Submitter, @@ -63,7 +63,7 @@ def initialize(connection: Sqewer::Connection.default, num_threads: DEFAULT_NUM_THREADS) @logger = logger - @connection = connection + @connection_pool = connection_pool @serializer = serializer @middleware_stack = middleware_stack @execution_context_class = execution_context_class @@ -104,7 +104,9 @@ def start break if stopping? if queue_has_capacity? - messages = @connection.receive_messages + messages = connection_pool.with do |connection| + connection.receive_messages + end if messages.any? messages.each {|m| @execution_queue << m } @logger.debug { "[worker] Received and buffered %d messages" % messages.length } if messages.any? @@ -201,7 +203,7 @@ def handle_message(message) # we can send out those commands in one go (without interfering with senders # on other threads, as it seems the Aws::SQS::Client is not entirely # thread-safe - or at least not it's HTTP client part). - box = Sqewer::ConnectionMessagebox.new(connection) + box = Sqewer::ConnectionMessagebox.new(connection_pool) return box.delete_message(message.receipt_handle) unless message.has_body? job = middleware_stack.around_deserialization(serializer, message.receipt_handle, message.body, message.attributes) do @@ -241,7 +243,7 @@ def perform(message) # we can send out those commands in one go (without interfering with senders # on other threads, as it seems the Aws::SQS::Client is not entirely # thread-safe - or at least not it's HTTP client part). - box = Sqewer::ConnectionMessagebox.new(connection) + box = Sqewer::ConnectionMessagebox.new(connection_pool) job = middleware_stack.around_deserialization(serializer, message.receipt_handle, message.body, message.attributes) do serializer.unserialize(message.body) diff --git a/spec/sqewer/submitter_spec.rb b/spec/sqewer/submitter_spec.rb index ba96d11..d6c122b 100644 --- a/spec/sqewer/submitter_spec.rb +++ b/spec/sqewer/submitter_spec.rb @@ -3,10 +3,10 @@ describe Sqewer::Submitter do describe '.default' do it 'returns a set up Submitter with the configured Connection and Serializer' do - expect(ENV).to receive(:fetch).with('SQS_QUEUE_URL').and_return('https://some-queue.aws.com') + # expect(ENV).to receive(:fetch).with('SQS_QUEUE_URL').and_return('https://some-queue.aws.com') s = described_class.default - expect(s.connection).to respond_to(:send_message) + expect(s.connection_pool).to respond_to(:with) expect(s.serializer).to respond_to(:serialize) end end @@ -21,10 +21,12 @@ fake_connection = double('Some SQS connection') expect(fake_connection).to receive(:send_message).at_least(5).times.with('serialized-object-data', any_args) + fake_connection_pool = double('Fake pool') + allow(fake_connection_pool).to receive(:with).and_yield(fake_connection) fake_job = double('Some job', run: true) - subject = described_class.new(fake_connection, fake_serializer) + subject = described_class.new(fake_connection_pool, fake_serializer) 5.times { subject.submit!(fake_job) } end @@ -37,10 +39,12 @@ fake_connection = double('Some SQS connection') expect(fake_connection).to receive(:send_message).with('serialized-object-data', {delay_seconds: 5}) + fake_connection_pool = double('Fake pool') + allow(fake_connection_pool).to receive(:with).and_yield(fake_connection) fake_job = double('Some job', run: true) - subject = described_class.new(fake_connection, fake_serializer) + subject = described_class.new(fake_connection_pool, fake_serializer) subject.submit!(fake_job, delay_seconds: 5) end @@ -57,19 +61,23 @@ fake_connection = double('Some SQS connection') expect(fake_connection).to receive(:send_message).with('serialized-object-data', {delay_seconds: 899}) + fake_connection_pool = double('Fake pool') + allow(fake_connection_pool).to receive(:with).and_yield(fake_connection) fake_job = double('Some job', run: true) - subject = described_class.new(fake_connection, fake_serializer) + subject = described_class.new(fake_connection_pool, fake_serializer) subject.submit!(fake_job, delay_seconds: 4585659855) end it "raises an error if the job does not respond to call" do fake_serializer = double('Some serializer') fake_connection = double('Some SQS connection') + fake_connection_pool = double('Fake pool') + allow(fake_connection_pool).to receive(:with).and_yield(fake_connection) fake_job = double('Some job') - subject = described_class.new(fake_connection, fake_serializer) + subject = described_class.new(fake_connection_pool, fake_serializer) expect { subject.submit!(fake_job, delay_seconds: 5) }.to raise_error(Sqewer::Submitter::NotSqewerJob) diff --git a/spec/sqewer/worker_spec.rb b/spec/sqewer/worker_spec.rb index d98554b..602430f 100644 --- a/spec/sqewer/worker_spec.rb +++ b/spec/sqewer/worker_spec.rb @@ -7,7 +7,7 @@ } it 'has all the necessary attributes' do - attrs = [:logger, :connection, :serializer, :middleware_stack, + attrs = [:logger, :connection_pool, :serializer, :middleware_stack, :execution_context_class, :submitter_class, :num_threads] default_worker = described_class.default attrs.each do | attr_name | @@ -53,7 +53,9 @@ allow(fake_conn).to receive(:receive_messages) { loop { sleep 0.5 } } - worker = described_class.new(logger: test_logger, connection: fake_conn) + fake_conn_pool = double('Fake pool') + allow(fake_conn_pool).to receive(:with).and_yield(fake_conn) + worker = described_class.new(logger: test_logger, connection_pool: fake_conn_pool) worker.start sleep 2 worker.stop @@ -64,9 +66,11 @@ it 'it stops itself gracefully' do fake_conn = double('bad connection') allow(fake_conn).to receive(:receive_messages).and_raise("boom") + fake_conn_pool = double('Fake pool') + allow(fake_conn_pool).to receive(:with).and_yield(fake_conn) log_device = StringIO.new('') - worker = described_class.new(logger: Logger.new(log_device), connection: fake_conn) + worker = described_class.new(logger: Logger.new(log_device), connection_pool: fake_conn_pool) expect(worker).to receive(:stop).at_least(:once) expect(worker).not_to receive(:kill) diff --git a/sqewer.gemspec b/sqewer.gemspec index bed86b5..5b602c9 100644 --- a/sqewer.gemspec +++ b/sqewer.gemspec @@ -30,6 +30,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_runtime_dependency "aws-sdk-sqs", "~> 1" + spec.add_runtime_dependency "connection_pool" spec.add_runtime_dependency "rack" spec.add_runtime_dependency "very_tiny_state_machine" spec.add_runtime_dependency "ks" From 8d781303048570b1ff32e587cc72d6338593eb2f Mon Sep 17 00:00:00 2001 From: Andrei Horak Date: Thu, 13 Feb 2020 12:12:35 +0100 Subject: [PATCH 2/6] Use ConnectionPool defaults --- lib/sqewer.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sqewer.rb b/lib/sqewer.rb index 17c3744..dc1bfe1 100644 --- a/lib/sqewer.rb +++ b/lib/sqewer.rb @@ -27,7 +27,7 @@ def self.submit!(*jobs, **options) end def self.connection_pool - @connection_pool ||= ConnectionPool.new(timeout: 5, size: 20) do + @connection_pool ||= ConnectionPool.new do Sqewer::Connection.default end end From 4fe1f0bbc34339ce69c7a9c346ba9ee437ed488a Mon Sep 17 00:00:00 2001 From: Andrei Horak Date: Thu, 13 Feb 2020 13:21:04 +0100 Subject: [PATCH 3/6] Rename to Sqewer.default_connection_pool --- lib/sqewer.rb | 4 ++-- lib/sqewer/submitter.rb | 2 +- lib/sqewer/worker.rb | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/sqewer.rb b/lib/sqewer.rb index dc1bfe1..a1882ce 100644 --- a/lib/sqewer.rb +++ b/lib/sqewer.rb @@ -26,8 +26,8 @@ def self.submit!(*jobs, **options) Sqewer::Submitter.default.submit!(*jobs, **options) end - def self.connection_pool - @connection_pool ||= ConnectionPool.new do + def self.default_connection_pool + @default_connection_pool ||= ConnectionPool.new do Sqewer::Connection.default end end diff --git a/lib/sqewer/submitter.rb b/lib/sqewer/submitter.rb index 7ff8cb0..569c52b 100644 --- a/lib/sqewer/submitter.rb +++ b/lib/sqewer/submitter.rb @@ -8,7 +8,7 @@ class Sqewer::Submitter < Struct.new(:connection_pool, :serializer) # Returns a default Submitter, configured with the default connection # and the default serializer. def self.default - new(Sqewer.connection_pool, Sqewer::Serializer.default) + new(Sqewer.default_connection_pool, Sqewer::Serializer.default) end def submit!(job, **kwargs_for_send) diff --git a/lib/sqewer/worker.rb b/lib/sqewer/worker.rb index ffd3df4..0213a42 100644 --- a/lib/sqewer/worker.rb +++ b/lib/sqewer/worker.rb @@ -54,7 +54,7 @@ def self.default # @param middleware_stack[Sqewer::MiddlewareStack] the middleware stack that is going to be used # @param logger[Logger] the logger to log execution to and to pass to the jobs # @param num_threads[Fixnum] how many worker threads to spawn - def initialize(connection_pool: Sqewer.connection_pool, + def initialize(connection_pool: Sqewer.default_connection_pool, serializer: Sqewer::Serializer.default, execution_context_class: Sqewer::ExecutionContext, submitter_class: Sqewer::Submitter, From b7795dda9b0309629ad4cebfc3274580fba484a8 Mon Sep 17 00:00:00 2001 From: Andrei Horak Date: Thu, 13 Feb 2020 16:37:52 +0100 Subject: [PATCH 4/6] Reset connection pool between tests, due to ENV mutation --- lib/sqewer.rb | 6 ++++++ spec/spec_helper.rb | 1 + 2 files changed, 7 insertions(+) diff --git a/lib/sqewer.rb b/lib/sqewer.rb index a1882ce..0027bc9 100644 --- a/lib/sqewer.rb +++ b/lib/sqewer.rb @@ -32,6 +32,12 @@ def self.default_connection_pool end end + def self.reset_default_connection_pool! + @default_connection_pool = ConnectionPool.new do + Sqewer::Connection.default + end + end + # If we are within Rails, load the railtie require_relative 'sqewer/extensions/railtie' if defined?(Rails) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c04ce82..54846b8 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -28,6 +28,7 @@ client.delete_queue(queue_url: ENV.fetch('SQS_QUEUE_URL')) rescue Aws::SQS::Errors::NonExistentQueue ENV.delete('SQS_QUEUE_URL') + Sqewer.reset_default_connection_pool! else example.run end From 088522afc8288c148999d328c15c0d1de02d9e76 Mon Sep 17 00:00:00 2001 From: Andrei Horak Date: Thu, 13 Feb 2020 16:38:22 +0100 Subject: [PATCH 5/6] Differentiate between pools and message boxes in submitters --- lib/sqewer/submitter.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/sqewer/submitter.rb b/lib/sqewer/submitter.rb index 569c52b..ec4a4a7 100644 --- a/lib/sqewer/submitter.rb +++ b/lib/sqewer/submitter.rb @@ -21,8 +21,12 @@ def submit!(job, **kwargs_for_send) else serializer.serialize(job) end - connection_pool.with do |connection| - connection.send_message(message_body, **kwargs_for_send) + if connection_pool.is_a?(Sqewer::ConnectionMessagebox) + connection_pool.send_message(message_body, **kwargs_for_send) + else + connection_pool.with do |connection| + connection.send_message(message_body, **kwargs_for_send) + end end end From 42b087ed39a7d4ada25749f5fc1b0fde3db99156 Mon Sep 17 00:00:00 2001 From: Andrei Horak Date: Thu, 13 Feb 2020 16:38:34 +0100 Subject: [PATCH 6/6] Update inline docs --- lib/sqewer/worker.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/sqewer/worker.rb b/lib/sqewer/worker.rb index 0213a42..e880618 100644 --- a/lib/sqewer/worker.rb +++ b/lib/sqewer/worker.rb @@ -12,7 +12,7 @@ class Sqewer::Worker # @return [Logger] The logger used for job execution attr_reader :logger - # @return [Sqewer::Connection] The connection for sending and receiving messages + # @return [ConnectionPool] The pool of connections for sending and receiving messages attr_reader :connection_pool # @return [Sqewer::Serializer] The serializer for unmarshalling and marshalling @@ -46,7 +46,7 @@ def self.default # Creates a new Worker. The Worker, unlike it is in the Rails tradition, is only responsible for # the actual processing of jobs, and not for the job arguments. # - # @param connection[Sqewer::Connection] the object that handles polling and submitting + # @param connection_pool[ConnectionPool] pool of objects that handle polling and submitting # @param serializer[#serialize, #unserialize] the serializer/unserializer for the jobs # @param execution_context_class[Class] the class for the execution context (will be instantiated by # the worker for each job execution)