Skip to content

Commit

Permalink
add concurrent execution to consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
desheikh committed May 5, 2024
1 parent d60b235 commit 3c9ec54
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 41 deletions.
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: .
specs:
eventsimple (1.4.2)
concurrent-ruby (>= 1.2.3)
dry-struct (~> 1.6)
dry-types (~> 1.7)
pg (~> 1.4)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ Generate migration to setup the outbox cursor table. This table is used to track
```

Create a consummer and processor class for the outbox.
Note: The presence of the consumer class moves all writes to the respective events table to be written using an advisory lock.

```ruby
require 'eventsimple/outbox/consumer'
Expand All @@ -304,6 +303,7 @@ module UserComponent
identitfier 'UserComponent::Consumer'
consumes_event UserEvent
processor EventProcessor
concurrency 5
end
end
```
Expand Down
1 change: 1 addition & 0 deletions eventsimple.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency 'rails', '~> 7.0'
spec.add_runtime_dependency 'retriable', '~> 3.1'
spec.add_runtime_dependency 'with_advisory_lock', '>= 5.1'
spec.add_runtime_dependency 'concurrent-ruby', '>= 1.2.3'

spec.add_development_dependency 'bundle-audit'
spec.add_development_dependency 'fuubar'
Expand Down
1 change: 1 addition & 0 deletions lib/eventsimple/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Engine < ::Rails::Engine

config.after_initialize do
require 'eventsimple/reactor'
require 'eventsimple/outbox/models/cursor'

verify_dispatchers!

Expand Down
38 changes: 27 additions & 11 deletions lib/eventsimple/outbox/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
module Eventsimple
module Outbox
module Consumer
class ExitError < StandardError; end

def self.extended(klass)
klass.class_exec do
class_attribute :_event_klass
class_attribute :_identifier
class_attribute :_processor_klass
class_attribute :_processor
class_attribute :_processor_pool
class_attribute :_concurrency, default: 5
class_attribute :stop_consumer, default: false
class_attribute :_identifier
end
end

Expand All @@ -28,7 +31,11 @@ def consumes_event(event_klass)

def processor(processor_klass)
self._processor_klass = processor_klass
self._processor = processor_klass.new
self._processor_pool = _concurrency.times.map { processor_klass.new }
end

def concurrency(concurrency)
self._concurrency = concurrency
end

def start # rubocop:disable Metrics/AbcSize
Expand All @@ -46,28 +53,37 @@ def start # rubocop:disable Metrics/AbcSize

def run_consumer
raise 'Eventsimple: No event class defined' unless _event_klass
raise 'Eventsimple: No processor defined' unless _processor
raise 'Eventsimple: No processor defined' unless _processor_klass
raise 'Eventsimple: No identifier defined' unless _identifier
raise 'Eventsimple: No concurrency defined' unless _concurrency.is_a?(Integer)

Rails.logger.info("Starting consumer for #{_identifier}, processing #{_event_klass} events")
$stdout.puts("Starting consumer for #{_identifier}")

cursor = Outbox::Cursor.fetch(_identifier)

until stop_consumer
_event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch|
batch.each do |event|
_processor.call(event)
grouped_events = batch.group_by { |event| event.aggregate_id.unpack1('L') % _concurrency }

promises = grouped_events.map { |index, events|
Concurrent::Promises.future {
events.each do |event|
_processor_pool[index].call(event)
raise ExitError if stop_consumer
end
}
}

cursor = event.id
break if stop_consumer
end
Concurrent::Promises.zip(*promises).value!

cursor = batch.last.id
Outbox::Cursor.set(_identifier, cursor)
break if stop_consumer
end

sleep(1)
end
rescue ExitError
$stdout.puts("Stopping consumer for #{_identifier}")
end
end
end
Expand Down
84 changes: 56 additions & 28 deletions spec/dummy/spec/components/user_component/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,29 @@
allow(described_class).to receive(:sleep) do
described_class.stop_consumer = true
end
allow(described_class._processor_pool[0]).to receive(:call)
allow(described_class._processor_pool[1]).to receive(:call)
allow(described_class._processor_pool[2]).to receive(:call)
allow(described_class._processor_pool[3]).to receive(:call)
allow(described_class._processor_pool[4]).to receive(:call)
allow(described_class._processor_pool[0]).to receive(:call)
allow(described_class._processor_pool[1]).to receive(:call)
allow(described_class._processor_pool[2]).to receive(:call)
allow(described_class._processor_pool[3]).to receive(:call)
allow(described_class._processor_pool[4]).to receive(:call)
end

after do
described_class.stop_consumer = false
end

let!(:event5_2) { create(:user_event, user: create(:user, canonical_id: '5cd4914b-e03c-4c20-aaf0-a2b9769fd514')) }
let!(:event5) { create(:user_event, user: create(:user, canonical_id: 'f77a5726-f10e-45c6-92a1-62073f1720d1')) }
let!(:event4) { create(:user_event, user: create(:user, canonical_id: '65b0303a-5239-4212-9127-a9dc01658e38')) }
let!(:event3) { create(:user_event, user: create(:user, canonical_id: '9abde676-8a1e-473d-a095-9651ac177b37')) }
let!(:event2) { create(:user_event, user: create(:user, canonical_id: '0e0ce944-8299-4c55-b58e-c48a766b44c4')) }
let!(:event1) { create(:user_event, user: create(:user, canonical_id: '7a5dc301-c982-4871-bd25-a5eadc97113a')) }

it 'has an identifier' do
expect(described_class._identifier).to eq('UserComponent::Consumer')
end
Expand All @@ -30,10 +47,10 @@
described_class._event_klass = UserEvent
end

it 'raises an error when no processor is defined' do
described_class._processor = nil
it 'raises an error when no processor class is defined' do
described_class._processor_klass = nil
expect { run_consumer }.to raise_error(RuntimeError, 'Eventsimple: No processor defined')
described_class._processor = UserComponent::EventProcessor.new
described_class._processor_klass = UserComponent::EventProcessor.new
end

it 'raises an error when no identifier is defined' do
Expand All @@ -44,54 +61,65 @@
end

describe '.run_consumer' do
it 'records the last processed event position' do
event = create(:user_event)

it 'executes processors and records the last processed event position' do
cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')
expect(cursor).to be(0)

expect(described_class._processor).to receive(:call).once

run_consumer

expect(described_class._processor_pool[0]).to have_received(:call).once
expect(described_class._processor_pool[1]).to have_received(:call).once
expect(described_class._processor_pool[2]).to have_received(:call).once
expect(described_class._processor_pool[3]).to have_received(:call).once
expect(described_class._processor_pool[4]).to have_received(:call).twice

cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')
expect(cursor).to eq(event.id)
expect(cursor).to eq(event5_2.id)
end

context 'when consumer is stopped inside batch' do
let(:user) { create(:user) }
let!(:events) { create_list(:user_event, 1100, user: user) }
context 'when consumer is stopped while inside batch' do
it 'does not change cursor position' do
allow(described_class._processor_pool[4]).to receive(:call) do |e|
expect(e.id).to be_in([event5.id, event5_2.id])

it 'breaks correctly and sets the cursor to the last processed event position' do
allow(described_class._processor).to receive(:call) do |e|
expect(e.id).to be_in(events[0..1].map(&:id))

# stop consumer after the second event in the batch is processed
if e.id == events[1].id
described_class.stop_consumer = true
end
described_class.stop_consumer = true if e.id == event5_2.id
end

run_consumer

expect(described_class._processor).to have_received(:call).exactly(2).times
expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(events[1].id)
expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(0)
end
end

context 'with an existing cursor' do
let!(:events) { create_list(:user_event, 5) }
context 'when any processor raises an exception' do
it 'does not change cursor position' do
allow(described_class._processor_pool[4]).to receive(:call) do |e|
expect(e.id).to be_in([event5.id, event5_2.id])

raise 'unknown_error' if e.id == event5_2.id
end

expect { run_consumer }.to raise_error(RuntimeError, 'unknown_error')

expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(0)
end
end

context 'with an existing cursor' do
before do
Eventsimple::Outbox::Cursor.set('UserComponent::Consumer', events[2].id)
allow(described_class._processor).to receive(:call)
Eventsimple::Outbox::Cursor.set('UserComponent::Consumer', event2.id)
end

it 'starts after the last processed event position' do
run_consumer

expect(described_class._processor).to have_received(:call).twice
expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(events[4].id)
expect(described_class._processor_pool[0]).not_to have_received(:call)
expect(described_class._processor_pool[1]).not_to have_received(:call)
expect(described_class._processor_pool[2]).to have_received(:call).once
expect(described_class._processor_pool[3]).to have_received(:call).once
expect(described_class._processor_pool[4]).to have_received(:call).twice

expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(event5_2.id)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/dummy/spec/factories/user_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
FactoryBot.define do
factory :user_event do
user
type { 'UserComponent::Events::Created' }
type { 'Created' }
data {
{
canonical_id: SecureRandom.uuid,
Expand Down

0 comments on commit 3c9ec54

Please sign in to comment.