Skip to content

Commit

Permalink
add concurrent execution to consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
desheikh committed May 5, 2024
1 parent d60b235 commit a7e20e2
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 38 deletions.
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: .
specs:
eventsimple (1.4.2)
concurrent-ruby (>= 1.2.3)
dry-struct (~> 1.6)
dry-types (~> 1.7)
pg (~> 1.4)
Expand Down
1 change: 1 addition & 0 deletions eventsimple.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency 'rails', '~> 7.0'
spec.add_runtime_dependency 'retriable', '~> 3.1'
spec.add_runtime_dependency 'with_advisory_lock', '>= 5.1'
spec.add_runtime_dependency 'concurrent-ruby', '>= 1.2.3'

spec.add_development_dependency 'bundle-audit'
spec.add_development_dependency 'fuubar'
Expand Down
1 change: 1 addition & 0 deletions lib/eventsimple/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Engine < ::Rails::Engine

config.after_initialize do
require 'eventsimple/reactor'
require 'eventsimple/outbox/models/cursor'

verify_dispatchers!

Expand Down
36 changes: 26 additions & 10 deletions lib/eventsimple/outbox/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
module Eventsimple
module Outbox
module Consumer
class ExitError < StandardError; end

def self.extended(klass)
klass.class_exec do
class_attribute :_event_klass
class_attribute :_processor_klass
class_attribute :_processor
class_attribute :_processor_pool
class_attribute :_concurrency, default: 5
class_attribute :stop_consumer, default: false
class_attribute :_identifier
end
Expand All @@ -28,7 +31,11 @@ def consumes_event(event_klass)

def processor(processor_klass)
self._processor_klass = processor_klass
self._processor = processor_klass.new
self._processor_pool = _concurrency.times.map { processor_klass.new }
end

def concurrency(concurrency)
self._concurrency = concurrency
end

def start # rubocop:disable Metrics/AbcSize
Expand All @@ -46,28 +53,37 @@ def start # rubocop:disable Metrics/AbcSize

def run_consumer
raise 'Eventsimple: No event class defined' unless _event_klass
raise 'Eventsimple: No processor defined' unless _processor
raise 'Eventsimple: No processor defined' unless _processor_klass
raise 'Eventsimple: No identifier defined' unless _identifier
raise 'Eventsimple: No concurrency defined' unless _concurrency.is_a?(Integer)

Rails.logger.info("Starting consumer for #{_identifier}, processing #{_event_klass} events")
$stdout.puts("Starting consumer for #{_identifier}")

cursor = Outbox::Cursor.fetch(_identifier)

until stop_consumer
_event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch|
batch.each do |event|
_processor.call(event)
grouped_events = batch.group_by{ |event| event.aggregate_id.unpack('L').first % _concurrency}

Check failure on line 66 in lib/eventsimple/outbox/consumer.rb

View workflow job for this annotation

GitHub Actions / Build

[Correctable] Layout/SpaceBeforeBlockBraces: Space missing to the left of {.

Check failure on line 66 in lib/eventsimple/outbox/consumer.rb

View workflow job for this annotation

GitHub Actions / Build

[Correctable] Style/UnpackFirst: Use unpack1('L') instead of unpack('L').first.

Check failure on line 66 in lib/eventsimple/outbox/consumer.rb

View workflow job for this annotation

GitHub Actions / Build

[Correctable] Layout/SpaceInsideBlockBraces: Space missing inside }.

promises = grouped_events.map { |index, events|
Concurrent::Promises.future {
events.each do |event|
_processor_pool[index].call(event)
raise ExitError if stop_consumer
end
}
}

cursor = event.id
break if stop_consumer
end
Concurrent::Promises.zip(*promises).value!

cursor = batch.last.id
Outbox::Cursor.set(_identifier, cursor)
break if stop_consumer
end

sleep(1)
end
rescue ExitError
$stdout.puts("Stopping consumer for #{_identifier}")
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions spec/dummy/app/components/user_component/event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module UserComponent
class EventProcessor
def call(event)
# no-op
# sleep Random.rand(1000)/1000.to_f
puts "Processed event: #{event.id} in object_id: #{self.object_id}"

Check failure on line 6 in spec/dummy/app/components/user_component/event_processor.rb

View workflow job for this annotation

GitHub Actions / Build

[Correctable] Style/RedundantSelf: Redundant self detected.
end
end
end
81 changes: 54 additions & 27 deletions spec/dummy/spec/components/user_component/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
described_class._event_klass = UserEvent
end

it 'raises an error when no processor is defined' do
described_class._processor = nil
it 'raises an error when no processor class is defined' do
described_class._processor_klass = nil
expect { run_consumer }.to raise_error(RuntimeError, 'Eventsimple: No processor defined')
described_class._processor = UserComponent::EventProcessor.new
described_class._processor_klass = UserComponent::EventProcessor.new
end

it 'raises an error when no identifier is defined' do
Expand All @@ -43,55 +43,82 @@
end
end

let!(:event1) { create(:user_event, user: create(:user, canonical_id: '7a5dc301-c982-4871-bd25-a5eadc97113a')) }

Check failure on line 46 in spec/dummy/spec/components/user_component/consumer_spec.rb

View workflow job for this annotation

GitHub Actions / Build

[Correctable] RSpec/LetBeforeExamples: Move let before the examples in the group.
let!(:event2) { create(:user_event, user: create(:user, canonical_id: '0e0ce944-8299-4c55-b58e-c48a766b44c4')) }

Check failure on line 47 in spec/dummy/spec/components/user_component/consumer_spec.rb

View workflow job for this annotation

GitHub Actions / Build

[Correctable] RSpec/LetBeforeExamples: Move let before the examples in the group.
let!(:event3) { create(:user_event, user: create(:user, canonical_id: '9abde676-8a1e-473d-a095-9651ac177b37')) }

Check failure on line 48 in spec/dummy/spec/components/user_component/consumer_spec.rb

View workflow job for this annotation

GitHub Actions / Build

[Correctable] RSpec/LetBeforeExamples: Move let before the examples in the group.
let!(:event4) { create(:user_event, user: create(:user, canonical_id: '65b0303a-5239-4212-9127-a9dc01658e38')) }

Check failure on line 49 in spec/dummy/spec/components/user_component/consumer_spec.rb

View workflow job for this annotation

GitHub Actions / Build

[Correctable] RSpec/LetBeforeExamples: Move let before the examples in the group.
let!(:event5) { create(:user_event, user: create(:user, canonical_id: 'f77a5726-f10e-45c6-92a1-62073f1720d1')) }

Check failure on line 50 in spec/dummy/spec/components/user_component/consumer_spec.rb

View workflow job for this annotation

GitHub Actions / Build

[Correctable] RSpec/LetBeforeExamples: Move let before the examples in the group.
let!(:event5_2) { create(:user_event, user: create(:user, canonical_id: '5cd4914b-e03c-4c20-aaf0-a2b9769fd514')) }

before do
allow(described_class._processor_pool[0]).to receive(:call)
allow(described_class._processor_pool[1]).to receive(:call)
allow(described_class._processor_pool[2]).to receive(:call)
allow(described_class._processor_pool[3]).to receive(:call)
allow(described_class._processor_pool[4]).to receive(:call)
end

describe '.run_consumer' do
it 'records the last processed event position' do
event = create(:user_event)
it 'executes processors and records the last processed event position' do

cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')
expect(cursor).to be(0)

expect(described_class._processor).to receive(:call).once

run_consumer

expect(described_class._processor_pool[0]).to have_received(:call).once
expect(described_class._processor_pool[1]).to have_received(:call).once
expect(described_class._processor_pool[2]).to have_received(:call).once
expect(described_class._processor_pool[3]).to have_received(:call).once
expect(described_class._processor_pool[4]).to have_received(:call).twice

cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')
expect(cursor).to eq(event.id)
expect(cursor).to eq(event5_2.id)
end

context 'when consumer is stopped inside batch' do
let(:user) { create(:user) }
let!(:events) { create_list(:user_event, 1100, user: user) }
context 'when consumer is stopped while inside batch' do
it 'it does not change cursor position' do
allow(described_class._processor_pool[4]).to receive(:call) do |e|
expect(e.id).to be_in([event5.id, event5_2.id])

it 'breaks correctly and sets the cursor to the last processed event position' do
allow(described_class._processor).to receive(:call) do |e|
expect(e.id).to be_in(events[0..1].map(&:id))

# stop consumer after the second event in the batch is processed
if e.id == events[1].id
described_class.stop_consumer = true
end
described_class.stop_consumer = true if e.id == event5_2.id
end

run_consumer

expect(described_class._processor).to have_received(:call).exactly(2).times
expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(events[1].id)
expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(0)
end
end

context 'with an existing cursor' do
let!(:events) { create_list(:user_event, 5) }
context 'when any processor raises an exception' do
it 'it does not change cursor position' do
allow(described_class._processor_pool[4]).to receive(:call) do |e|
expect(e.id).to be_in([event5.id, event5_2.id])

raise RuntimeError, 'unknown_error' if e.id == event5_2.id
end

expect { run_consumer }.to raise_error(RuntimeError, 'unknown_error')

expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(0)
end
end

context 'with an existing cursor' do
before do
Eventsimple::Outbox::Cursor.set('UserComponent::Consumer', events[2].id)
allow(described_class._processor).to receive(:call)
Eventsimple::Outbox::Cursor.set('UserComponent::Consumer', event2.id)
end

it 'starts after the last processed event position' do
run_consumer

expect(described_class._processor).to have_received(:call).twice
expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(events[4].id)
expect(described_class._processor_pool[0]).not_to have_received(:call)
expect(described_class._processor_pool[1]).not_to have_received(:call)
expect(described_class._processor_pool[2]).to have_received(:call).once
expect(described_class._processor_pool[3]).to have_received(:call).once
expect(described_class._processor_pool[4]).to have_received(:call).twice

expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(event5_2.id)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/dummy/spec/factories/user_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
FactoryBot.define do
factory :user_event do
user
type { 'UserComponent::Events::Created' }
type { 'Created' }
data {
{
canonical_id: SecureRandom.uuid,
Expand Down

0 comments on commit a7e20e2

Please sign in to comment.