Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SQS connection pooling #52

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/sqewer.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'connection_pool'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need to require connection_pool or depend on it - we can provide a NullPool instead (Prorate gem has a similar setup). Less dependencies that way :-) and if we document connection_pool compatibility people may install it themselves

# The enclosing module for the library
module Sqewer
# Eager-load everything except extensions. Sort to ensure
Expand All @@ -24,6 +26,18 @@ def self.submit!(*jobs, **options)
Sqewer::Submitter.default.submit!(*jobs, **options)
end

def self.default_connection_pool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create an accessor pair (client_pool / client_pool=) instead, to adhere by the principle of least surprise? We can then also return a NullPool from the reader if nothing was configured

@default_connection_pool ||= ConnectionPool.new do
Sqewer::Connection.default
end
end

def self.reset_default_connection_pool!
@default_connection_pool = ConnectionPool.new do
Sqewer::Connection.default
end
end

# If we are within Rails, load the railtie
require_relative 'sqewer/extensions/railtie' if defined?(Rails)

Expand Down
18 changes: 10 additions & 8 deletions lib/sqewer/connection_messagebox.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
# a messagebox will be able to buffer those sends and pack them in batches,
# consequently performing less requests
class Sqewer::ConnectionMessagebox
def initialize(connection)
@connection = connection
def initialize(connection_pool)
@connection_pool = connection_pool
@deletes = []
@sends = []
@mux = Mutex.new
Expand Down Expand Up @@ -49,14 +49,16 @@ def delete_message(message_identifier)
# All of those will use batching where possible.
def flush!
@mux.synchronize do
@connection.send_multiple_messages do | buffer |
@sends.each { |body, kwargs| buffer.send_message(body, **kwargs) }
end
@connection_pool.with do |connection|
connection.send_multiple_messages do | buffer |
@sends.each { |body, kwargs| buffer.send_message(body, **kwargs) }
end

@connection.delete_multiple_messages do | buffer |
@deletes.each { |id| buffer.delete_message(id) }
connection.delete_multiple_messages do | buffer |
@deletes.each { |id| buffer.delete_message(id) }
end
(@sends.length + @deletes.length).tap{ @sends.clear; @deletes.clear }
end
(@sends.length + @deletes.length).tap{ @sends.clear; @deletes.clear }
end
end
end
12 changes: 9 additions & 3 deletions lib/sqewer/submitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
# (something that responds to `#send_message`)
# and the serializer (something that responds to `#serialize`) to
# convert the job into the string that will be put in the queue.
class Sqewer::Submitter < Struct.new(:connection, :serializer)
class Sqewer::Submitter < Struct.new(:connection_pool, :serializer)
NotSqewerJob = Class.new(StandardError)

# Returns a default Submitter, configured with the default connection
# and the default serializer.
def self.default
new(Sqewer::Connection.default, Sqewer::Serializer.default)
new(Sqewer.default_connection_pool, Sqewer::Serializer.default)
end

def submit!(job, **kwargs_for_send)
Expand All @@ -21,7 +21,13 @@ def submit!(job, **kwargs_for_send)
else
serializer.serialize(job)
end
connection.send_message(message_body, **kwargs_for_send)
if connection_pool.is_a?(Sqewer::ConnectionMessagebox)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a pool always, but with a stand-in NullPool when none is configured?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit trickier, because the message box also uses a connection (it acts as a composed object). Given the API around this, this is the best solution I found. If you have something better in mind, I'd be happy to make it work :)

connection_pool.send_message(message_body, **kwargs_for_send)
else
connection_pool.with do |connection|
connection.send_message(message_body, **kwargs_for_send)
end
end
end

private
Expand Down
20 changes: 11 additions & 9 deletions lib/sqewer/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class Sqewer::Worker
# @return [Logger] The logger used for job execution
attr_reader :logger

# @return [Sqewer::Connection] The connection for sending and receiving messages
attr_reader :connection
# @return [ConnectionPool] The pool of connections for sending and receiving messages
attr_reader :connection_pool

# @return [Sqewer::Serializer] The serializer for unmarshalling and marshalling
attr_reader :serializer
Expand Down Expand Up @@ -46,15 +46,15 @@ def self.default
# Creates a new Worker. The Worker, unlike it is in the Rails tradition, is only responsible for
# the actual processing of jobs, and not for the job arguments.
#
# @param connection[Sqewer::Connection] the object that handles polling and submitting
# @param connection_pool[ConnectionPool] pool of objects that handle polling and submitting
# @param serializer[#serialize, #unserialize] the serializer/unserializer for the jobs
# @param execution_context_class[Class] the class for the execution context (will be instantiated by
# @param execution_context_class[Class] the class for the execution context (will be instantiated by
# the worker for each job execution)
# @param submitter_class[Class] the class used for submitting jobs (will be instantiated by the worker for each job execution)
# @param middleware_stack[Sqewer::MiddlewareStack] the middleware stack that is going to be used
# @param logger[Logger] the logger to log execution to and to pass to the jobs
# @param num_threads[Fixnum] how many worker threads to spawn
def initialize(connection: Sqewer::Connection.default,
def initialize(connection_pool: Sqewer.default_connection_pool,
serializer: Sqewer::Serializer.default,
execution_context_class: Sqewer::ExecutionContext,
submitter_class: Sqewer::Submitter,
Expand All @@ -63,7 +63,7 @@ def initialize(connection: Sqewer::Connection.default,
num_threads: DEFAULT_NUM_THREADS)

@logger = logger
@connection = connection
@connection_pool = connection_pool
@serializer = serializer
@middleware_stack = middleware_stack
@execution_context_class = execution_context_class
Expand Down Expand Up @@ -104,7 +104,9 @@ def start
break if stopping?

if queue_has_capacity?
messages = @connection.receive_messages
messages = connection_pool.with do |connection|
connection.receive_messages
end
if messages.any?
messages.each {|m| @execution_queue << m }
@logger.debug { "[worker] Received and buffered %d messages" % messages.length } if messages.any?
Expand Down Expand Up @@ -201,7 +203,7 @@ def handle_message(message)
# we can send out those commands in one go (without interfering with senders
# on other threads, as it seems the Aws::SQS::Client is not entirely
# thread-safe - or at least not it's HTTP client part).
box = Sqewer::ConnectionMessagebox.new(connection)
box = Sqewer::ConnectionMessagebox.new(connection_pool)
return box.delete_message(message.receipt_handle) unless message.has_body?

job = middleware_stack.around_deserialization(serializer, message.receipt_handle, message.body, message.attributes) do
Expand Down Expand Up @@ -241,7 +243,7 @@ def perform(message)
# we can send out those commands in one go (without interfering with senders
# on other threads, as it seems the Aws::SQS::Client is not entirely
# thread-safe - or at least not it's HTTP client part).
box = Sqewer::ConnectionMessagebox.new(connection)
box = Sqewer::ConnectionMessagebox.new(connection_pool)

job = middleware_stack.around_deserialization(serializer, message.receipt_handle, message.body, message.attributes) do
serializer.unserialize(message.body)
Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
client.delete_queue(queue_url: ENV.fetch('SQS_QUEUE_URL')) rescue Aws::SQS::Errors::NonExistentQueue

ENV.delete('SQS_QUEUE_URL')
Sqewer.reset_default_connection_pool!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have an accessor pair we could do an explicit assignment here instead?

else
example.run
end
Expand Down
20 changes: 14 additions & 6 deletions spec/sqewer/submitter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
describe Sqewer::Submitter do
describe '.default' do
it 'returns a set up Submitter with the configured Connection and Serializer' do
expect(ENV).to receive(:fetch).with('SQS_QUEUE_URL').and_return('https://some-queue.aws.com')

# expect(ENV).to receive(:fetch).with('SQS_QUEUE_URL').and_return('https://some-queue.aws.com')
s = described_class.default
expect(s.connection).to respond_to(:send_message)
expect(s.connection_pool).to respond_to(:with)
expect(s.serializer).to respond_to(:serialize)
end
end
Expand All @@ -21,10 +21,12 @@

fake_connection = double('Some SQS connection')
expect(fake_connection).to receive(:send_message).at_least(5).times.with('serialized-object-data', any_args)
fake_connection_pool = double('Fake pool')
allow(fake_connection_pool).to receive(:with).and_yield(fake_connection)

fake_job = double('Some job', run: true)

subject = described_class.new(fake_connection, fake_serializer)
subject = described_class.new(fake_connection_pool, fake_serializer)
5.times { subject.submit!(fake_job) }
end

Expand All @@ -37,10 +39,12 @@

fake_connection = double('Some SQS connection')
expect(fake_connection).to receive(:send_message).with('serialized-object-data', {delay_seconds: 5})
fake_connection_pool = double('Fake pool')
allow(fake_connection_pool).to receive(:with).and_yield(fake_connection)

fake_job = double('Some job', run: true)

subject = described_class.new(fake_connection, fake_serializer)
subject = described_class.new(fake_connection_pool, fake_serializer)
subject.submit!(fake_job, delay_seconds: 5)
end

Expand All @@ -57,19 +61,23 @@

fake_connection = double('Some SQS connection')
expect(fake_connection).to receive(:send_message).with('serialized-object-data', {delay_seconds: 899})
fake_connection_pool = double('Fake pool')
allow(fake_connection_pool).to receive(:with).and_yield(fake_connection)

fake_job = double('Some job', run: true)

subject = described_class.new(fake_connection, fake_serializer)
subject = described_class.new(fake_connection_pool, fake_serializer)
subject.submit!(fake_job, delay_seconds: 4585659855)
end

it "raises an error if the job does not respond to call" do
fake_serializer = double('Some serializer')
fake_connection = double('Some SQS connection')
fake_connection_pool = double('Fake pool')
allow(fake_connection_pool).to receive(:with).and_yield(fake_connection)
fake_job = double('Some job')

subject = described_class.new(fake_connection, fake_serializer)
subject = described_class.new(fake_connection_pool, fake_serializer)
expect {
subject.submit!(fake_job, delay_seconds: 5)
}.to raise_error(Sqewer::Submitter::NotSqewerJob)
Expand Down
10 changes: 7 additions & 3 deletions spec/sqewer/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
}

it 'has all the necessary attributes' do
attrs = [:logger, :connection, :serializer, :middleware_stack,
attrs = [:logger, :connection_pool, :serializer, :middleware_stack,
:execution_context_class, :submitter_class, :num_threads]
default_worker = described_class.default
attrs.each do | attr_name |
Expand Down Expand Up @@ -53,7 +53,9 @@
allow(fake_conn).to receive(:receive_messages) {
loop { sleep 0.5 }
}
worker = described_class.new(logger: test_logger, connection: fake_conn)
fake_conn_pool = double('Fake pool')
allow(fake_conn_pool).to receive(:with).and_yield(fake_conn)
worker = described_class.new(logger: test_logger, connection_pool: fake_conn_pool)
worker.start
sleep 2
worker.stop
Expand All @@ -64,9 +66,11 @@
it 'it stops itself gracefully' do
fake_conn = double('bad connection')
allow(fake_conn).to receive(:receive_messages).and_raise("boom")
fake_conn_pool = double('Fake pool')
allow(fake_conn_pool).to receive(:with).and_yield(fake_conn)

log_device = StringIO.new('')
worker = described_class.new(logger: Logger.new(log_device), connection: fake_conn)
worker = described_class.new(logger: Logger.new(log_device), connection_pool: fake_conn_pool)

expect(worker).to receive(:stop).at_least(:once)
expect(worker).not_to receive(:kill)
Expand Down
1 change: 1 addition & 0 deletions sqewer.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]

spec.add_runtime_dependency "aws-sdk-sqs", "~> 1"
spec.add_runtime_dependency "connection_pool"
spec.add_runtime_dependency "rack"
spec.add_runtime_dependency "very_tiny_state_machine"
spec.add_runtime_dependency "ks"
Expand Down