diff --git a/Gemfile.lock b/Gemfile.lock index d32aaf937..64be587de 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -2,6 +2,7 @@ PATH remote: . specs: eventsimple (1.4.2) + concurrent-ruby (>= 1.2.3) dry-struct (~> 1.6) dry-types (~> 1.7) pg (~> 1.4) diff --git a/eventsimple.gemspec b/eventsimple.gemspec index 45a8f3d82..a2b513fe2 100644 --- a/eventsimple.gemspec +++ b/eventsimple.gemspec @@ -29,6 +29,7 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency 'rails', '~> 7.0' spec.add_runtime_dependency 'retriable', '~> 3.1' spec.add_runtime_dependency 'with_advisory_lock', '>= 5.1' + spec.add_runtime_dependency 'concurrent-ruby', '>= 1.2.3' spec.add_development_dependency 'bundle-audit' spec.add_development_dependency 'fuubar' diff --git a/lib/eventsimple/engine.rb b/lib/eventsimple/engine.rb index 2e0cd9fcf..7b11ec313 100644 --- a/lib/eventsimple/engine.rb +++ b/lib/eventsimple/engine.rb @@ -12,6 +12,7 @@ class Engine < ::Rails::Engine config.after_initialize do require 'eventsimple/reactor' + require 'eventsimple/outbox/models/cursor' verify_dispatchers! diff --git a/lib/eventsimple/outbox/consumer.rb b/lib/eventsimple/outbox/consumer.rb index 82b769768..fe8ab968a 100644 --- a/lib/eventsimple/outbox/consumer.rb +++ b/lib/eventsimple/outbox/consumer.rb @@ -6,11 +6,14 @@ module Eventsimple module Outbox module Consumer + class ExitError < StandardError; end + def self.extended(klass) klass.class_exec do class_attribute :_event_klass class_attribute :_processor_klass - class_attribute :_processor + class_attribute :_processor_pool + class_attribute :_concurrency, default: 5 class_attribute :stop_consumer, default: false class_attribute :_identifier end @@ -28,7 +31,11 @@ def consumes_event(event_klass) def processor(processor_klass) self._processor_klass = processor_klass - self._processor = processor_klass.new + self._processor_pool = _concurrency.times.map { processor_klass.new } + end + + def concurrency(concurrency) + self._concurrency = concurrency end def start # rubocop:disable Metrics/AbcSize @@ -46,28 +53,37 @@ def start # rubocop:disable Metrics/AbcSize def run_consumer raise 'Eventsimple: No event class defined' unless _event_klass - raise 'Eventsimple: No processor defined' unless _processor + raise 'Eventsimple: No processor defined' unless _processor_klass raise 'Eventsimple: No identifier defined' unless _identifier + raise 'Eventsimple: No concurrency defined' unless _concurrency.is_a?(Integer) - Rails.logger.info("Starting consumer for #{_identifier}, processing #{_event_klass} events") + $stdout.puts("Starting consumer for #{_identifier}") cursor = Outbox::Cursor.fetch(_identifier) until stop_consumer _event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch| - batch.each do |event| - _processor.call(event) + grouped_events = batch.group_by{ |event| event.aggregate_id.unpack('L').first % _concurrency} + + promises = grouped_events.map { |index, events| + Concurrent::Promises.future { + events.each do |event| + _processor_pool[index].call(event) + raise ExitError if stop_consumer + end + } + } - cursor = event.id - break if stop_consumer - end + Concurrent::Promises.zip(*promises).value! + cursor = batch.last.id Outbox::Cursor.set(_identifier, cursor) - break if stop_consumer end sleep(1) end + rescue ExitError + $stdout.puts("Stopping consumer for #{_identifier}") end end end diff --git a/spec/dummy/app/components/user_component/event_processor.rb b/spec/dummy/app/components/user_component/event_processor.rb index a221bbf40..0fc0ac6fb 100644 --- a/spec/dummy/app/components/user_component/event_processor.rb +++ b/spec/dummy/app/components/user_component/event_processor.rb @@ -2,6 +2,8 @@ module UserComponent class EventProcessor def call(event) # no-op + # sleep Random.rand(1000)/1000.to_f + puts "Processed event: #{event.id} in object_id: #{self.object_id}" end end end diff --git a/spec/dummy/spec/components/user_component/consumer_spec.rb b/spec/dummy/spec/components/user_component/consumer_spec.rb index 5cd328ae7..9361268c2 100644 --- a/spec/dummy/spec/components/user_component/consumer_spec.rb +++ b/spec/dummy/spec/components/user_component/consumer_spec.rb @@ -30,10 +30,10 @@ described_class._event_klass = UserEvent end - it 'raises an error when no processor is defined' do - described_class._processor = nil + it 'raises an error when no processor class is defined' do + described_class._processor_klass = nil expect { run_consumer }.to raise_error(RuntimeError, 'Eventsimple: No processor defined') - described_class._processor = UserComponent::EventProcessor.new + described_class._processor_klass = UserComponent::EventProcessor.new end it 'raises an error when no identifier is defined' do @@ -43,55 +43,82 @@ end end + let!(:event1) { create(:user_event, user: create(:user, canonical_id: '7a5dc301-c982-4871-bd25-a5eadc97113a')) } + let!(:event2) { create(:user_event, user: create(:user, canonical_id: '0e0ce944-8299-4c55-b58e-c48a766b44c4')) } + let!(:event3) { create(:user_event, user: create(:user, canonical_id: '9abde676-8a1e-473d-a095-9651ac177b37')) } + let!(:event4) { create(:user_event, user: create(:user, canonical_id: '65b0303a-5239-4212-9127-a9dc01658e38')) } + let!(:event5) { create(:user_event, user: create(:user, canonical_id: 'f77a5726-f10e-45c6-92a1-62073f1720d1')) } + let!(:event5_2) { create(:user_event, user: create(:user, canonical_id: '5cd4914b-e03c-4c20-aaf0-a2b9769fd514')) } + + before do + allow(described_class._processor_pool[0]).to receive(:call) + allow(described_class._processor_pool[1]).to receive(:call) + allow(described_class._processor_pool[2]).to receive(:call) + allow(described_class._processor_pool[3]).to receive(:call) + allow(described_class._processor_pool[4]).to receive(:call) + end + describe '.run_consumer' do - it 'records the last processed event position' do - event = create(:user_event) + it 'executes processors and records the last processed event position' do cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer') expect(cursor).to be(0) - expect(described_class._processor).to receive(:call).once - run_consumer + expect(described_class._processor_pool[0]).to have_received(:call).once + expect(described_class._processor_pool[1]).to have_received(:call).once + expect(described_class._processor_pool[2]).to have_received(:call).once + expect(described_class._processor_pool[3]).to have_received(:call).once + expect(described_class._processor_pool[4]).to have_received(:call).twice + cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer') - expect(cursor).to eq(event.id) + expect(cursor).to eq(event5_2.id) end - context 'when consumer is stopped inside batch' do - let(:user) { create(:user) } - let!(:events) { create_list(:user_event, 1100, user: user) } + context 'when consumer is stopped while inside batch' do + it 'it does not change cursor position' do + allow(described_class._processor_pool[4]).to receive(:call) do |e| + expect(e.id).to be_in([event5.id, event5_2.id]) - it 'breaks correctly and sets the cursor to the last processed event position' do - allow(described_class._processor).to receive(:call) do |e| - expect(e.id).to be_in(events[0..1].map(&:id)) - - # stop consumer after the second event in the batch is processed - if e.id == events[1].id - described_class.stop_consumer = true - end + described_class.stop_consumer = true if e.id == event5_2.id end run_consumer - expect(described_class._processor).to have_received(:call).exactly(2).times - expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(events[1].id) + expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(0) end end - context 'with an existing cursor' do - let!(:events) { create_list(:user_event, 5) } + context 'when any processor raises an exception' do + it 'it does not change cursor position' do + allow(described_class._processor_pool[4]).to receive(:call) do |e| + expect(e.id).to be_in([event5.id, event5_2.id]) + raise RuntimeError, 'unknown_error' if e.id == event5_2.id + end + + expect { run_consumer }.to raise_error(RuntimeError, 'unknown_error') + + expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(0) + end + end + + context 'with an existing cursor' do before do - Eventsimple::Outbox::Cursor.set('UserComponent::Consumer', events[2].id) - allow(described_class._processor).to receive(:call) + Eventsimple::Outbox::Cursor.set('UserComponent::Consumer', event2.id) end it 'starts after the last processed event position' do run_consumer - expect(described_class._processor).to have_received(:call).twice - expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(events[4].id) + expect(described_class._processor_pool[0]).not_to have_received(:call) + expect(described_class._processor_pool[1]).not_to have_received(:call) + expect(described_class._processor_pool[2]).to have_received(:call).once + expect(described_class._processor_pool[3]).to have_received(:call).once + expect(described_class._processor_pool[4]).to have_received(:call).twice + + expect(Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')).to eq(event5_2.id) end end end diff --git a/spec/dummy/spec/factories/user_event.rb b/spec/dummy/spec/factories/user_event.rb index 7aab161a8..af85472e9 100644 --- a/spec/dummy/spec/factories/user_event.rb +++ b/spec/dummy/spec/factories/user_event.rb @@ -3,7 +3,7 @@ FactoryBot.define do factory :user_event do user - type { 'UserComponent::Events::Created' } + type { 'Created' } data { { canonical_id: SecureRandom.uuid,