Skip to content

Commit

Permalink
add concurrent execution to consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
desheikh committed May 7, 2024
1 parent f80e88c commit 0909e4d
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 43 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## 1.5.0 - 2024-05-07
### Changed
- The outbox consumer processes event batches concurrently
- Removes unused group_number/group_size config

## 1.4.3 - 2024-05-06
### Changed
- The order of execution for synchronous reactors is now guaranteed to be the order in which they were registered.
Expand Down
3 changes: 2 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
PATH
remote: .
specs:
eventsimple (1.4.3)
eventsimple (1.5.0)
concurrent-ruby (>= 1.2.3)
dry-struct (~> 1.6)
dry-types (~> 1.7)
pg (~> 1.4)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ Generate migration to setup the outbox cursor table. This table is used to track
```

Create a consummer and processor class for the outbox.
Note: The presence of the consumer class moves all writes to the respective events table to be written using an advisory lock.

```ruby
require 'eventsimple/outbox/consumer'
Expand All @@ -304,6 +303,7 @@ module UserComponent
identitfier 'UserComponent::Consumer'
consumes_event UserEvent
processor EventProcessor
concurrency 5
end
end
```
Expand Down
1 change: 1 addition & 0 deletions eventsimple.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency 'rails', '~> 7.0'
spec.add_runtime_dependency 'retriable', '~> 3.1'
spec.add_runtime_dependency 'with_advisory_lock', '>= 5.1'
spec.add_runtime_dependency 'concurrent-ruby', '>= 1.2.3'

spec.add_development_dependency 'bundle-audit'
spec.add_development_dependency 'fuubar'
Expand Down
1 change: 1 addition & 0 deletions lib/eventsimple/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Engine < ::Rails::Engine

config.after_initialize do
require 'eventsimple/reactor'
require 'eventsimple/outbox/models/cursor'

verify_dispatchers!

Expand Down
41 changes: 29 additions & 12 deletions lib/eventsimple/outbox/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
module Eventsimple
module Outbox
module Consumer
class ExitError < StandardError; end

def self.extended(klass)
klass.class_exec do
class_attribute :_event_klass
class_attribute :_identifier
class_attribute :_processor_klass
class_attribute :_processor
class_attribute :_processor_pool
class_attribute :_concurrency, default: 5
class_attribute :_batch_size, default: 1000
class_attribute :stop_consumer, default: false
class_attribute :_identifier
end
end

Expand All @@ -28,7 +32,11 @@ def consumes_event(event_klass)

def processor(processor_klass)
self._processor_klass = processor_klass
self._processor = processor_klass.new
self._processor_pool = _concurrency.times.map { processor_klass.new }
end

def concurrency(concurrency)
self._concurrency = concurrency
end

def start # rubocop:disable Metrics/AbcSize
Expand All @@ -46,28 +54,37 @@ def start # rubocop:disable Metrics/AbcSize

def run_consumer
raise 'Eventsimple: No event class defined' unless _event_klass
raise 'Eventsimple: No processor defined' unless _processor
raise 'Eventsimple: No processor defined' unless _processor_klass
raise 'Eventsimple: No identifier defined' unless _identifier
raise 'Eventsimple: No concurrency defined' unless _concurrency.is_a?(Integer)

Rails.logger.info("Starting consumer for #{_identifier}, processing #{_event_klass} events")
$stdout.puts("Starting consumer for #{_identifier}")

cursor = Outbox::Cursor.fetch(_identifier)

until stop_consumer
_event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch|
batch.each do |event|
_processor.call(event)
_event_klass.unscoped.in_batches(start: cursor + 1, load: true, of: _batch_size).each do |batch|
grouped_events = batch.group_by { |event| event.aggregate_id.unpack1('L') % _concurrency }

promises = grouped_events.map { |index, events|
Concurrent::Promises.future {
events.each do |event|
_processor_pool[index].call(event)
raise ExitError if stop_consumer
end
}
}

cursor = event.id
break if stop_consumer
end
Concurrent::Promises.zip(*promises).value!

cursor = batch.last.id
Outbox::Cursor.set(_identifier, cursor)
break if stop_consumer
end

sleep(1)
end
rescue ExitError
$stdout.puts("Stopping consumer for #{_identifier}")
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/eventsimple/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Eventsimple
VERSION = '1.4.3'
VERSION = '1.5.0'
end
92 changes: 65 additions & 27 deletions spec/dummy/spec/components/user_component/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,24 @@
allow(described_class).to receive(:sleep) do
described_class.stop_consumer = true
end
allow(described_class._processor_pool[0]).to receive(:call)
allow(described_class._processor_pool[1]).to receive(:call)
allow(described_class._processor_pool[2]).to receive(:call)
allow(described_class._processor_pool[3]).to receive(:call)
allow(described_class._processor_pool[4]).to receive(:call)
end

after do
described_class.stop_consumer = false
end

let!(:event1) { create(:user_event, user: create(:user, canonical_id: '7a5dc301-c982-4871-bd25-a5eadc97113a')) }
let!(:event2) { create(:user_event, user: create(:user, canonical_id: '0e0ce944-8299-4c55-b58e-c48a766b44c4')) }
let!(:event3) { create(:user_event, user: create(:user, canonical_id: '9abde676-8a1e-473d-a095-9651ac177b37')) }
let!(:event4) { create(:user_event, user: create(:user, canonical_id: '65b0303a-5239-4212-9127-a9dc01658e38')) }
let!(:event5) { create(:user_event, user: create(:user, canonical_id: 'f77a5726-f10e-45c6-92a1-62073f1720d1')) }
let!(:event5_2) { create(:user_event, user: create(:user, canonical_id: '5cd4914b-e03c-4c20-aaf0-a2b9769fd514')) }

it 'has an identifier' do
expect(described_class._identifier).to eq('UserComponent::Consumer')
end
Expand All @@ -30,10 +42,10 @@
described_class._event_klass = UserEvent
end

it 'raises an error when no processor is defined' do
described_class._processor = nil
it 'raises an error when no processor class is defined' do
described_class._processor_klass = nil
expect { run_consumer }.to raise_error(RuntimeError, 'Eventsimple: No processor defined')
described_class._processor = UserComponent::EventProcessor.new
described_class._processor_klass = UserComponent::EventProcessor.new
end

it 'raises an error when no identifier is defined' do
Expand All @@ -44,54 +56,80 @@
end

describe '.run_consumer' do
it 'records the last processed event position' do
event = create(:user_event)

it 'executes processors and records the last processed event position' do
cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')
expect(cursor).to be(0)

expect(described_class._processor).to receive(:call).once

run_consumer

expect(described_class._processor_pool[0]).to have_received(:call).once
expect(described_class._processor_pool[1]).to have_received(:call).once
expect(described_class._processor_pool[2]).to have_received(:call).once
expect(described_class._processor_pool[3]).to have_received(:call).once
expect(described_class._processor_pool[4]).to have_received(:call).twice

cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')
expect(cursor).to eq(event.id)
expect(cursor).to eq(event5_2.id)
end

context 'when consumer is stopped inside batch' do
let(:user) { create(:user) }
let!(:events) { create_list(:user_event, 1100, user: user) }
it 'updates the cursor position after each batch' do
allow(described_class).to receive(:sleep)
described_class._batch_size = 2

expect(Eventsimple::Outbox::Cursor).to receive(:set).with('UserComponent::Consumer', event2.id)
expect(Eventsimple::Outbox::Cursor).to receive(:set).with('UserComponent::Consumer', event4.id)
expect(Eventsimple::Outbox::Cursor).to receive(:set).with('UserComponent::Consumer', event5_2.id) do
described_class.stop_consumer = true
end

run_consumer

described_class._batch_size = 1000
end

it 'breaks correctly and sets the cursor to the last processed event position' do
allow(described_class._processor).to receive(:call) do |e|
expect(e.id).to be_in(events[0..1].map(&:id))
context 'when consumer is stopped while inside batch' do
it 'does not change cursor position' do
allow(described_class._processor_pool[4]).to receive(:call) do |e|
expect(e.id).to be_in([event5.id, event5_2.id])

# stop consumer after the second event in the batch is processed
if e.id == events[1].id
described_class.stop_consumer = true
end
described_class.stop_consumer = true if e.id == event5_2.id
end

run_consumer

expect(described_class._processor).to have_received(:call).exactly(2).times
expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(events[1].id)
expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(0)
end
end

context 'with an existing cursor' do
let!(:events) { create_list(:user_event, 5) }
context 'when any processor raises an exception' do
it 'does not change cursor position' do
allow(described_class._processor_pool[4]).to receive(:call) do |e|
expect(e.id).to be_in([event5.id, event5_2.id])

raise 'unknown_error' if e.id == event5_2.id
end

expect { run_consumer }.to raise_error(RuntimeError, 'unknown_error')

expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(0)
end
end

context 'with an existing cursor' do
before do
Eventsimple::Outbox::Cursor.set('UserComponent::Consumer', events[2].id)
allow(described_class._processor).to receive(:call)
Eventsimple::Outbox::Cursor.set('UserComponent::Consumer', event2.id)
end

it 'starts after the last processed event position' do
run_consumer

expect(described_class._processor).to have_received(:call).twice
expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(events[4].id)
expect(described_class._processor_pool[0]).not_to have_received(:call)
expect(described_class._processor_pool[1]).not_to have_received(:call)
expect(described_class._processor_pool[2]).to have_received(:call).once
expect(described_class._processor_pool[3]).to have_received(:call).once
expect(described_class._processor_pool[4]).to have_received(:call).twice

expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(event5_2.id)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/dummy/spec/factories/user_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
FactoryBot.define do
factory :user_event do
user
type { 'UserComponent::Events::Created' }
type { 'Created' }
data {
{
canonical_id: SecureRandom.uuid,
Expand Down

0 comments on commit 0909e4d

Please sign in to comment.