Skip to content

Commit

Permalink
remove consumer group config
Browse files Browse the repository at this point in the history
  • Loading branch information
desheikh committed May 7, 2024
1 parent efb4dce commit f80e88c
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 24 deletions.
1 change: 0 additions & 1 deletion lib/eventsimple/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ def drives_events_for(aggregate_klass, aggregate_id:, events_namespace: nil)
self._aggregate_id = aggregate_id

class_attribute :_outbox_enabled
class_attribute :_consumer_group_size

class_attribute :_on_invalid_transition
self._on_invalid_transition = ->(error) { raise error }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ class CreateEventsimpleOutboxCursor < ActiveRecord::Migration<%= migration_versi
def change
create_table :eventsimple_outbox_cursors do |t|
t.string :event_klass, null: false
t.integer :group_number, null: false
t.bigint :cursor, null: false

t.index [:event_klass, :group_number], unique: true
t.index [:event_klass], unique: true
end
end
end
15 changes: 7 additions & 8 deletions lib/eventsimple/outbox/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ def identifier(name)
self._identifier = name
end

def consumes_event(event_klass, group_size: 1)
def consumes_event(event_klass)
event_klass._outbox_enabled = true
event_klass._consumer_group_size = group_size

self._event_klass = event_klass
end
Expand All @@ -32,7 +31,7 @@ def processor(processor_klass)
self._processor = processor_klass.new
end

def start(group_number: 0) # rubocop:disable Metrics/AbcSize
def start # rubocop:disable Metrics/AbcSize
Signal.trap('INT') do
self.stop_consumer = true
$stdout.puts('INT received, stopping consumer')
Expand All @@ -42,17 +41,17 @@ def start(group_number: 0) # rubocop:disable Metrics/AbcSize
$stdout.puts('TERM received, stopping consumer')
end

run_consumer(group_number: group_number)
run_consumer
end

def run_consumer(group_number:)
def run_consumer
raise 'Eventsimple: No event class defined' unless _event_klass
raise 'Eventsimple: No processor defined' unless _processor
raise 'Eventsimple: No identifier defined' unless _identifier

Rails.logger.info("Starting consumer for #{_identifier}, processing #{_event_klass} events with group number #{group_number}")
Rails.logger.info("Starting consumer for #{_identifier}, processing #{_event_klass} events")

cursor = Outbox::Cursor.fetch(_identifier, group_number: group_number)
cursor = Outbox::Cursor.fetch(_identifier)

until stop_consumer
_event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch|
Expand All @@ -63,7 +62,7 @@ def run_consumer(group_number:)
break if stop_consumer
end

Outbox::Cursor.set(_identifier, cursor, group_number: group_number)
Outbox::Cursor.set(_identifier, cursor)
break if stop_consumer
end

Expand Down
9 changes: 4 additions & 5 deletions lib/eventsimple/outbox/models/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ module Outbox
class Cursor < Eventsimple.configuration.parent_record_klass
self.table_name = 'eventsimple_outbox_cursors'

def self.fetch(identifier, group_number: 0)
existing = find_by(identifier: identifier.to_s, group_number: group_number)
def self.fetch(identifier)
existing = find_by(identifier: identifier.to_s)
existing ? existing.cursor : 0
end

def self.set(identifier, cursor, group_number: 0)
def self.set(identifier, cursor)
upsert(
{
identifier: identifier.to_s,
group_number: group_number,
cursor: cursor,
},
unique_by: [:identifier, :group_number],
unique_by: [:identifier],
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ class CreateEventsimpleOutboxCursor < ActiveRecord::Migration[7.1]
def change
create_table :eventsimple_outbox_cursors do |t|
t.string :identifier, null: false
t.integer :group_number, null: false
t.bigint :cursor, null: false

t.index [:identifier, :group_number], unique: true,
name: 'index_eventsimple_outbox_cursors_event_klass_group_number'
t.index [:identifier], unique: true
end
end
end
3 changes: 1 addition & 2 deletions spec/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

create_table "eventsimple_outbox_cursors", force: :cascade do |t|
t.string "identifier", null: false
t.integer "group_number", null: false
t.bigint "cursor", null: false
t.index ["identifier", "group_number"], name: "idx_on_identifier_group_number_1aba2c6d46", unique: true
t.index ["identifier"], name: "index_eventsimple_outbox_cursors_on_identifier", unique: true
end

create_table "user_events", force: :cascade do |t|
Expand Down
6 changes: 3 additions & 3 deletions spec/dummy/spec/components/user_component/consumer_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
RSpec.describe UserComponent::Consumer do
subject(:run_consumer) { described_class.run_consumer(group_number: 0) }
subject(:run_consumer) { described_class.run_consumer }

before do
allow(described_class).to receive(:sleep) do
Expand Down Expand Up @@ -47,14 +47,14 @@
it 'records the last processed event position' do
event = create(:user_event)

cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer', group_number: 0)
cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')
expect(cursor).to be(0)

expect(described_class._processor).to receive(:call).once

run_consumer

cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer', group_number: 0)
cursor = Eventsimple::Outbox::Cursor.fetch('UserComponent::Consumer')
expect(cursor).to eq(event.id)
end

Expand Down

0 comments on commit f80e88c

Please sign in to comment.