diff --git a/lib/eventsimple/event.rb b/lib/eventsimple/event.rb index 14394a49b..4c47fb9c4 100644 --- a/lib/eventsimple/event.rb +++ b/lib/eventsimple/event.rb @@ -15,7 +15,6 @@ def drives_events_for(aggregate_klass, aggregate_id:, events_namespace: nil) self._aggregate_id = aggregate_id class_attribute :_outbox_enabled - class_attribute :_consumer_group_size class_attribute :_on_invalid_transition self._on_invalid_transition = ->(error) { raise error } diff --git a/lib/eventsimple/generators/outbox/templates/create_outbox_cursor.erb b/lib/eventsimple/generators/outbox/templates/create_outbox_cursor.erb index 6e0bd3880..aea702ef0 100644 --- a/lib/eventsimple/generators/outbox/templates/create_outbox_cursor.erb +++ b/lib/eventsimple/generators/outbox/templates/create_outbox_cursor.erb @@ -4,10 +4,9 @@ class CreateEventsimpleOutboxCursor < ActiveRecord::Migration<%= migration_versi def change create_table :eventsimple_outbox_cursors do |t| t.string :event_klass, null: false - t.integer :group_number, null: false t.bigint :cursor, null: false - t.index [:event_klass, :group_number], unique: true + t.index [:event_klass], unique: true end end end diff --git a/lib/eventsimple/outbox/consumer.rb b/lib/eventsimple/outbox/consumer.rb index fe8362f99..82b769768 100644 --- a/lib/eventsimple/outbox/consumer.rb +++ b/lib/eventsimple/outbox/consumer.rb @@ -20,9 +20,8 @@ def identifier(name) self._identifier = name end - def consumes_event(event_klass, group_size: 1) + def consumes_event(event_klass) event_klass._outbox_enabled = true - event_klass._consumer_group_size = group_size self._event_klass = event_klass end @@ -32,7 +31,7 @@ def processor(processor_klass) self._processor = processor_klass.new end - def start(group_number: 0) # rubocop:disable Metrics/AbcSize + def start # rubocop:disable Metrics/AbcSize Signal.trap('INT') do self.stop_consumer = true $stdout.puts('INT received, stopping consumer') @@ -42,17 +41,17 @@ def start(group_number: 0) # rubocop:disable Metrics/AbcSize $stdout.puts('TERM received, stopping consumer') end - run_consumer(group_number: group_number) + run_consumer end - def run_consumer(group_number:) + def run_consumer raise 'Eventsimple: No event class defined' unless _event_klass raise 'Eventsimple: No processor defined' unless _processor raise 'Eventsimple: No identifier defined' unless _identifier - Rails.logger.info("Starting consumer for #{_identifier}, processing #{_event_klass} events with group number #{group_number}") + Rails.logger.info("Starting consumer for #{_identifier}, processing #{_event_klass} events") - cursor = Outbox::Cursor.fetch(_identifier, group_number: group_number) + cursor = Outbox::Cursor.fetch(_identifier) until stop_consumer _event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch| @@ -63,7 +62,7 @@ def run_consumer(group_number:) break if stop_consumer end - Outbox::Cursor.set(_identifier, cursor, group_number: group_number) + Outbox::Cursor.set(_identifier, cursor) break if stop_consumer end diff --git a/lib/eventsimple/outbox/models/cursor.rb b/lib/eventsimple/outbox/models/cursor.rb index 7c3c106ca..63bbd0760 100644 --- a/lib/eventsimple/outbox/models/cursor.rb +++ b/lib/eventsimple/outbox/models/cursor.rb @@ -5,19 +5,18 @@ module Outbox class Cursor < Eventsimple.configuration.parent_record_klass self.table_name = 'eventsimple_outbox_cursors' - def self.fetch(identifier, group_number: 0) - existing = find_by(identifier: identifier.to_s, group_number: group_number) + def self.fetch(identifier) + existing = find_by(identifier: identifier.to_s) existing ? existing.cursor : 0 end - def self.set(identifier, cursor, group_number: 0) + def self.set(identifier, cursor) upsert( { identifier: identifier.to_s, - group_number: group_number, cursor: cursor, }, - unique_by: [:identifier, :group_number], + unique_by: [:identifier], ) end end diff --git a/spec/dummy/db/migrate/20240419175459_create_eventsimple_outbox_cursor.rb b/spec/dummy/db/migrate/20240419175459_create_eventsimple_outbox_cursor.rb index 5a47e6a20..eee11fcf2 100644 --- a/spec/dummy/db/migrate/20240419175459_create_eventsimple_outbox_cursor.rb +++ b/spec/dummy/db/migrate/20240419175459_create_eventsimple_outbox_cursor.rb @@ -4,11 +4,9 @@ class CreateEventsimpleOutboxCursor < ActiveRecord::Migration[7.1] def change create_table :eventsimple_outbox_cursors do |t| t.string :identifier, null: false - t.integer :group_number, null: false t.bigint :cursor, null: false - t.index [:identifier, :group_number], unique: true, - name: 'index_eventsimple_outbox_cursors_event_klass_group_number' + t.index [:identifier], unique: true end end end diff --git a/spec/dummy/db/schema.rb b/spec/dummy/db/schema.rb index d02b4238d..aa8ceff76 100644 --- a/spec/dummy/db/schema.rb +++ b/spec/dummy/db/schema.rb @@ -16,9 +16,8 @@ create_table "eventsimple_outbox_cursors", force: :cascade do |t| t.string "identifier", null: false - t.integer "group_number", null: false t.bigint "cursor", null: false - t.index ["identifier", "group_number"], name: "idx_on_identifier_group_number_1aba2c6d46", unique: true + t.index ["identifier"], name: "index_eventsimple_outbox_cursors_on_identifier", unique: true end create_table "user_events", force: :cascade do |t| diff --git a/spec/dummy/spec/components/user_component/consumer_spec.rb b/spec/dummy/spec/components/user_component/consumer_spec.rb index bde5d5704..5cd328ae7 100644 --- a/spec/dummy/spec/components/user_component/consumer_spec.rb +++ b/spec/dummy/spec/components/user_component/consumer_spec.rb @@ -1,5 +1,5 @@ RSpec.describe UserComponent::Consumer do - subject(:run_consumer) { described_class.run_consumer(group_number: 0) } + subject(:run_consumer) { described_class.run_consumer } before do allow(described_class).to receive(:sleep) do @@ -47,14 +47,14 @@ it 'records the last processed event position' do event = create(:user_event) - cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer', group_number: 0) + cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer') expect(cursor).to be(0) expect(described_class._processor).to receive(:call).once run_consumer - cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer', group_number: 0) + cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer') expect(cursor).to eq(event.id) end