Skip to content

Commit

Permalink
eventsimple outbox processor is instantiated once
Browse files Browse the repository at this point in the history
  • Loading branch information
desheikh committed Apr 22, 2024
1 parent 0f822af commit 6d67a12
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 29 deletions.
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@

source 'https://rubygems.org'

group :test do
gem 'factory_bot_rails'
end

# Specify your gem's dependencies in eventsimple.gemspec
gemspec
4 changes: 4 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ PATH
pg (~> 1.4)
rails (~> 7.0)
retriable (~> 3.1)
with_advisory_lock (>= 5.1)

GEM
remote: https://rubygems.org/
Expand Down Expand Up @@ -348,6 +349,9 @@ GEM
websocket-driver (0.7.6)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.5)
with_advisory_lock (5.1.0)
activerecord (>= 6.1)
zeitwerk (>= 2.6)
ws-style (7.4.3)
rubocop-rspec (>= 2.2.0)
rubocop-vendor (>= 0.11)
Expand Down
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,7 @@ end
```ruby
module UserComponent
class EventProcessor
def initialize(event)
@event = event
end
attr_reader :event

def call
def call(event)
Rails.logger.info("PROCESSING EVENT: #{event.id}")
end
end
Expand Down
1 change: 1 addition & 0 deletions eventsimple.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency 'pg', '~> 1.4'
spec.add_runtime_dependency 'rails', '~> 7.0'
spec.add_runtime_dependency 'retriable', '~> 3.1'
spec.add_runtime_dependency 'with_advisory_lock', '>= 5.1'

spec.add_development_dependency 'bundle-audit'
spec.add_development_dependency 'factory_bot_rails'
Expand Down
6 changes: 3 additions & 3 deletions lib/eventsimple/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def drives_events_for(aggregate_klass, aggregate_id:, events_namespace: nil)
class_attribute :_aggregate_id
self._aggregate_id = aggregate_id

class_attribute :_outbox_mode
class_attribute :_outbox_concurrency
class_attribute :_outbox_enabled
class_attribute :_consumer_group_size

class_attribute :_on_invalid_transition
self._on_invalid_transition = ->(error) { raise error }
Expand Down Expand Up @@ -161,7 +161,7 @@ def create!(*args, &block)
end

def with_locks(&block)
if _outbox_mode
if _outbox_enabled
base_class.with_advisory_lock(base_class.name, { transaction: true }, &block)
else
yield
Expand Down
36 changes: 24 additions & 12 deletions lib/eventsimple/outbox/consumer.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require 'with_advisory_lock'
require 'eventsimple/outbox/models/cursor'

module Eventsimple
Expand All @@ -9,41 +10,52 @@ def self.extended(klass)
klass.class_exec do
class_attribute :_event_klass
class_attribute :_processor_klass
class_attribute :_processor
class_attribute :stop_consumer, default: false

Signal.trap('SIGINT') do
self.stop_consumer = true
$stdout.puts('SIGINT received, stopping consumer')
end
end
end

def consumes_event(event_klass, concurrency: 1)
event_klass._outbox_mode = true
event_klass._outbox_concurrency = concurrency
def consumes_event(event_klass, group_size: 1)
event_klass._outbox_enabled = true
event_klass._consumer_group_size = group_size

self._event_klass = event_klass
end

def processor(processor_klass)
self._processor_klass = processor_klass
self._processor = processor_klass.new
end

def start(group_number: 0, stop_at_end: false) # rubocop:disable Metrics/AbcSize
Signal.trap('INT') do
self.stop_consumer = true
$stdout.puts('INT received, stopping consumer')
end
Signal.trap('TERM') do
self.stop_consumer = true
$stdout.puts('TERM received, stopping consumer')
end

run_consumer(group_number: group_number, stop_at_end: stop_at_end)
end

def start # rubocop:disable Metrics/AbcSize
cursor = Outbox::Cursor.fetch(_event_klass, 0)
def run_consumer(group_number:, stop_at_end:)
cursor = Outbox::Cursor.fetch(_event_klass, group_number)

until stop_consumer
_event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch|
batch.each do |event|
_processor_klass.new(event).call
_processor.call(event)

break if stop_consumer
end

cursor = batch.last.id
Outbox::Cursor.set(_event_klass, 0, cursor)
Outbox::Cursor.set(_event_klass, group_number, cursor)
end

break if stop_at_end
sleep(1)
end
end
Expand Down
10 changes: 10 additions & 0 deletions spec/dummy/app/components/user_component/consumer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
require 'eventsimple/outbox/consumer'

module UserComponent
class Consumer
extend Eventsimple::Outbox::Consumer

consumes_event UserEvent
processor Processor
end
end
7 changes: 7 additions & 0 deletions spec/dummy/app/components/user_component/processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module UserComponent
class Processor
def call(event)
# no-op
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

class CreateEventsimpleOutboxCursor < ActiveRecord::Migration[7.1]
def change
create_table :eventsimple_outbox_cursors do |t|
t.string :event_klass, null: false
t.integer :group_number, null: false
t.bigint :cursor, null: false

t.index [:event_klass, :group_number], unique: true
end
end
end
9 changes: 8 additions & 1 deletion spec/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2022_09_17_150839) do
ActiveRecord::Schema[7.1].define(version: 2024_04_19_175459) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"

create_table "eventsimple_outbox_cursors", force: :cascade do |t|
t.string "event_klass", null: false
t.integer "group_number", null: false
t.bigint "cursor", null: false
t.index ["event_klass", "group_number"], name: "idx_on_event_klass_group_number_24bcae0807", unique: true
end

create_table "user_events", force: :cascade do |t|
t.string "aggregate_id", null: false
t.string "idempotency_key"
Expand Down
41 changes: 41 additions & 0 deletions spec/dummy/spec/components/user_component/consumer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
RSpec.describe UserComponent::Consumer do
let(:mock_processor) { instance_double(UserComponent::Processor, :call) }

before do
UserComponent::Consumer._processor = mock_processor
allow(mock_processor).to receive(:call)
end

it 'consumes events' do
expect(UserComponent::Consumer._event_klass).to eq(UserEvent)
end

it 'has a processor' do
expect(UserComponent::Consumer._processor_klass).to eq(UserComponent::Processor)
end

describe '.run_consumer' do
before do
UserComponent::Events::Created.create!(
user: User.new,
data: {
canonical_id: SecureRandom.uuid,
username: 'test-user',
email: '[email protected]',
}
)
end

it 'records the last processed event position' do
cursor = Eventsimple::Outbox::Cursor.fetch(UserEvent, 0)
expect(cursor).to be(0)

UserComponent::Consumer.run_consumer(group_number: 0, stop_at_end: true)

expect(mock_processor).to have_received(:call).once

cursor = Eventsimple::Outbox::Cursor.fetch(UserEvent, 0)
expect(cursor).to eq(UserEvent.last.id)
end
end
end
14 changes: 14 additions & 0 deletions spec/dummy/spec/factories/user_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

FactoryBot.define do
factory :user_event do
user { build(:user) }
data {
{
canonical_id: SecureRandom.uuid,
username: 'test-user',
email: '[email protected]',
}
}
end
end
6 changes: 5 additions & 1 deletion spec/lib/eventsimple/event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
end

context 'when entity is stale' do
# disable transactional tests so we can test the retry logic
def self.uses_transaction?(_method) = true
after(:each) { UserEvent.delete_all }

it 'retries and successfully writes the event' do
stale_user = User.find_by(canonical_id: user_canonical_id)

Expand Down Expand Up @@ -48,7 +52,7 @@

context 'when an event class no longer exists' do
it 'uses a no-op deleted class' do
UserEvent.insert({ type: 'NonExistentEvent', aggregate_id: user_canonical_id })
UserEvent.create(type: 'NonExistentEvent', aggregate_id: user_canonical_id)

event = UserEvent.last
expect(event).to be_a(UserEvent::Deleted__NonExistentEvent)
Expand Down
13 changes: 7 additions & 6 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# frozen_string_literal: true
ENV['RAILS_ENV'] ||= 'test'

require File.expand_path('../spec/dummy/config/environment.rb', __dir__)
ENV['RAILS_ROOT'] ||= "#{File.dirname(__FILE__)}../../../spec/dummy"

require 'pry'
require 'eventsimple'
require 'eventsimple/support/spec_helpers'

require 'retriable'
require 'rspec/rails'

RSpec.configure do |config|
config.expect_with :rspec do |expectations|
Expand All @@ -18,6 +23,7 @@
config.filter_run_when_matching :focus
config.example_status_persistence_file_path = "spec/examples.txt"
config.disable_monkey_patching!
config.use_transactional_fixtures = true

if config.files_to_run.one?
config.default_formatter = "doc"
Expand All @@ -27,11 +33,6 @@

Kernel.srand config.seed

require File.expand_path('../spec/dummy/config/environment.rb', __dir__)
ENV['RAILS_ROOT'] ||= "#{File.dirname(__FILE__)}../../../spec/dummy"

require 'rspec/rails'

ActiveRecord::Migration.maintain_test_schema!

Retriable.configure do |c|
Expand Down

0 comments on commit 6d67a12

Please sign in to comment.