diff --git a/CHANGELOG.md b/CHANGELOG.md index fbb0ed1fd..109efe667 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## 1.4.0 - 2024-04-02 +### Changed +- The outbox processor is instantiated once and takes the event as an argument. +- Added SIGTERM event handling and fixed shutdown behaviour. + ## 1.3.3 - 2024-04-02 ### Changed - add `parent_record` configuration so it can be easily overwritten diff --git a/Gemfile b/Gemfile index e85fcd8ec..6c2791be4 100644 --- a/Gemfile +++ b/Gemfile @@ -2,5 +2,9 @@ source 'https://rubygems.org' +group :test do + gem 'factory_bot_rails' +end + # Specify your gem's dependencies in eventsimple.gemspec gemspec diff --git a/Gemfile.lock b/Gemfile.lock index 38920b412..07b61b238 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,12 +1,13 @@ PATH remote: . specs: - eventsimple (1.3.3) + eventsimple (1.4.3) dry-struct (~> 1.6) dry-types (~> 1.7) pg (~> 1.4) rails (~> 7.0) retriable (~> 3.1) + with_advisory_lock (>= 5.1) GEM remote: https://rubygems.org/ @@ -176,7 +177,7 @@ GEM net-imap net-pop net-smtp - marcel (1.0.3) + marcel (1.0.4) method_source (1.0.0) mini_mime (1.1.5) minitest (5.22.2) @@ -189,7 +190,7 @@ GEM net-protocol net-protocol (0.2.2) timeout - net-smtp (0.4.0.1) + net-smtp (0.5.0) net-protocol nio4r (2.7.0) nokogiri (1.16.2-arm64-darwin) @@ -205,7 +206,7 @@ GEM parser (3.3.0.5) ast (~> 2.4.1) racc - pg (1.5.5) + pg (1.5.6) polyglot (0.3.5) pry (0.14.2) coderay (~> 1.1) @@ -348,6 +349,9 @@ GEM websocket-driver (0.7.6) websocket-extensions (>= 0.1.0) websocket-extensions (0.1.5) + with_advisory_lock (5.1.0) + activerecord (>= 6.1) + zeitwerk (>= 2.6) ws-style (7.4.3) rubocop-rspec (>= 2.2.0) rubocop-vendor (>= 0.11) diff --git a/README.md b/README.md index f57e28609..bc2e5651f 100644 --- a/README.md +++ b/README.md @@ -315,12 +315,7 @@ end ```ruby module UserComponent class EventProcessor - def initialize(event) - @event = event - end - attr_reader :event - - def call + def call(event) Rails.logger.info("PROCESSING EVENT: #{event.id}") end end diff --git a/eventsimple.gemspec b/eventsimple.gemspec index 758d9452f..45a8f3d82 100644 --- a/eventsimple.gemspec +++ b/eventsimple.gemspec @@ -28,9 +28,9 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency 'pg', '~> 1.4' spec.add_runtime_dependency 'rails', '~> 7.0' spec.add_runtime_dependency 'retriable', '~> 3.1' + spec.add_runtime_dependency 'with_advisory_lock', '>= 5.1' spec.add_development_dependency 'bundle-audit' - spec.add_development_dependency 'factory_bot_rails' spec.add_development_dependency 'fuubar' spec.add_development_dependency 'git' spec.add_development_dependency 'guard-rspec' diff --git a/lib/eventsimple/event.rb b/lib/eventsimple/event.rb index 6090bfce8..14394a49b 100644 --- a/lib/eventsimple/event.rb +++ b/lib/eventsimple/event.rb @@ -14,8 +14,8 @@ def drives_events_for(aggregate_klass, aggregate_id:, events_namespace: nil) class_attribute :_aggregate_id self._aggregate_id = aggregate_id - class_attribute :_outbox_mode - class_attribute :_outbox_concurrency + class_attribute :_outbox_enabled + class_attribute :_consumer_group_size class_attribute :_on_invalid_transition self._on_invalid_transition = ->(error) { raise error } @@ -161,7 +161,7 @@ def create!(*args, &block) end def with_locks(&block) - if _outbox_mode + if _outbox_enabled base_class.with_advisory_lock(base_class.name, { transaction: true }, &block) else yield diff --git a/lib/eventsimple/outbox/consumer.rb b/lib/eventsimple/outbox/consumer.rb index 9773731db..4d4a690c8 100644 --- a/lib/eventsimple/outbox/consumer.rb +++ b/lib/eventsimple/outbox/consumer.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require 'with_advisory_lock' require 'eventsimple/outbox/models/cursor' module Eventsimple @@ -9,41 +10,52 @@ def self.extended(klass) klass.class_exec do class_attribute :_event_klass class_attribute :_processor_klass + class_attribute :_processor class_attribute :stop_consumer, default: false - - Signal.trap('SIGINT') do - self.stop_consumer = true - $stdout.puts('SIGINT received, stopping consumer') - end end end - def consumes_event(event_klass, concurrency: 1) - event_klass._outbox_mode = true - event_klass._outbox_concurrency = concurrency + def consumes_event(event_klass, group_size: 1) + event_klass._outbox_enabled = true + event_klass._consumer_group_size = group_size self._event_klass = event_klass end def processor(processor_klass) self._processor_klass = processor_klass + self._processor = processor_klass.new + end + + def start(group_number: 0, stop_at_end: false) # rubocop:disable Metrics/AbcSize + Signal.trap('INT') do + self.stop_consumer = true + $stdout.puts('INT received, stopping consumer') + end + Signal.trap('TERM') do + self.stop_consumer = true + $stdout.puts('TERM received, stopping consumer') + end + + run_consumer(group_number: group_number, stop_at_end: stop_at_end) end - def start # rubocop:disable Metrics/AbcSize - cursor = Outbox::Cursor.fetch(_event_klass, 0) + def run_consumer(group_number:, stop_at_end:) + cursor = Outbox::Cursor.fetch(_event_klass, group_number) until stop_consumer _event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch| batch.each do |event| - _processor_klass.new(event).call + _processor.call(event) break if stop_consumer end cursor = batch.last.id - Outbox::Cursor.set(_event_klass, 0, cursor) + Outbox::Cursor.set(_event_klass, group_number, cursor) end + break if stop_at_end sleep(1) end end diff --git a/lib/eventsimple/version.rb b/lib/eventsimple/version.rb index b2a3f355e..36e112113 100644 --- a/lib/eventsimple/version.rb +++ b/lib/eventsimple/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Eventsimple - VERSION = '1.3.3' + VERSION = '1.4.0' end diff --git a/spec/dummy/app/components/user_component/consumer.rb b/spec/dummy/app/components/user_component/consumer.rb new file mode 100644 index 000000000..c78ae9c9d --- /dev/null +++ b/spec/dummy/app/components/user_component/consumer.rb @@ -0,0 +1,10 @@ +require 'eventsimple/outbox/consumer' + +module UserComponent + class Consumer + extend Eventsimple::Outbox::Consumer + + consumes_event UserEvent + processor Processor + end +end diff --git a/spec/dummy/app/components/user_component/processor.rb b/spec/dummy/app/components/user_component/processor.rb new file mode 100644 index 000000000..fbdf1242e --- /dev/null +++ b/spec/dummy/app/components/user_component/processor.rb @@ -0,0 +1,7 @@ +module UserComponent + class Processor + def call(event) + # no-op + end + end +end diff --git a/spec/dummy/db/migrate/20240419175459_create_eventsimple_outbox_cursor.rb b/spec/dummy/db/migrate/20240419175459_create_eventsimple_outbox_cursor.rb new file mode 100644 index 000000000..620675a39 --- /dev/null +++ b/spec/dummy/db/migrate/20240419175459_create_eventsimple_outbox_cursor.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class CreateEventsimpleOutboxCursor < ActiveRecord::Migration[7.1] + def change + create_table :eventsimple_outbox_cursors do |t| + t.string :event_klass, null: false + t.integer :group_number, null: false + t.bigint :cursor, null: false + + t.index [:event_klass, :group_number], unique: true + end + end +end diff --git a/spec/dummy/db/schema.rb b/spec/dummy/db/schema.rb index d6a9864cd..ac1b67175 100644 --- a/spec/dummy/db/schema.rb +++ b/spec/dummy/db/schema.rb @@ -10,10 +10,17 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2022_09_17_150839) do +ActiveRecord::Schema[7.1].define(version: 2024_04_19_175459) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" + create_table "eventsimple_outbox_cursors", force: :cascade do |t| + t.string "event_klass", null: false + t.integer "group_number", null: false + t.bigint "cursor", null: false + t.index ["event_klass", "group_number"], name: "idx_on_event_klass_group_number_24bcae0807", unique: true + end + create_table "user_events", force: :cascade do |t| t.string "aggregate_id", null: false t.string "idempotency_key" diff --git a/spec/dummy/spec/components/user_component/consumer_spec.rb b/spec/dummy/spec/components/user_component/consumer_spec.rb new file mode 100644 index 000000000..79d3018ce --- /dev/null +++ b/spec/dummy/spec/components/user_component/consumer_spec.rb @@ -0,0 +1,41 @@ +RSpec.describe UserComponent::Consumer do + let(:mock_processor) { instance_double(UserComponent::Processor, :call) } + + before do + described_class._processor = mock_processor + allow(mock_processor).to receive(:call) + end + + it 'consumes events' do + expect(described_class._event_klass).to eq(UserEvent) + end + + it 'has a processor' do + expect(described_class._processor_klass).to eq(UserComponent::Processor) + end + + describe '.run_consumer' do + before do + UserComponent::Events::Created.create!( + user: User.new, + data: { + canonical_id: SecureRandom.uuid, + username: 'test-user', + email: 'test@example.com', + }, + ) + end + + it 'records the last processed event position' do + cursor = Eventsimple::Outbox::Cursor.fetch(UserEvent, 0) + expect(cursor).to be(0) + + described_class.run_consumer(group_number: 0, stop_at_end: true) + + expect(mock_processor).to have_received(:call).once + + cursor = Eventsimple::Outbox::Cursor.fetch(UserEvent, 0) + expect(cursor).to eq(UserEvent.last.id) + end + end +end diff --git a/spec/dummy/spec/factories/user_event.rb b/spec/dummy/spec/factories/user_event.rb new file mode 100644 index 000000000..f7d4150ce --- /dev/null +++ b/spec/dummy/spec/factories/user_event.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +FactoryBot.define do + factory :user_event do + user { build(:user) } + data { + { + canonical_id: SecureRandom.uuid, + username: 'test-user', + email: 'test@example.com', + } + } + end +end diff --git a/spec/lib/eventsimple/event_spec.rb b/spec/lib/eventsimple/event_spec.rb index 74a9969a0..a5268a054 100644 --- a/spec/lib/eventsimple/event_spec.rb +++ b/spec/lib/eventsimple/event_spec.rb @@ -15,6 +15,10 @@ end context 'when entity is stale' do + # disable transactional tests so we can test the retry logic + def self.uses_transaction?(_method) = true + after { UserEvent.delete_all } + it 'retries and successfully writes the event' do stale_user = User.find_by(canonical_id: user_canonical_id) @@ -48,7 +52,7 @@ context 'when an event class no longer exists' do it 'uses a no-op deleted class' do - UserEvent.insert({ type: 'NonExistentEvent', aggregate_id: user_canonical_id }) + UserEvent.create!(type: 'NonExistentEvent', aggregate_id: user_canonical_id) event = UserEvent.last expect(event).to be_a(UserEvent::Deleted__NonExistentEvent) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 7f9514ed1..520c8237a 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,9 +1,15 @@ # frozen_string_literal: true +ENV['RAILS_ENV'] ||= 'test' + +require File.expand_path('../spec/dummy/config/environment.rb', __dir__) +ENV['RAILS_ROOT'] ||= "#{File.dirname(__FILE__)}../../../spec/dummy" + +require 'pry' require 'eventsimple' require 'eventsimple/support/spec_helpers' - require 'retriable' +require 'rspec/rails' RSpec.configure do |config| config.expect_with :rspec do |expectations| @@ -18,6 +24,7 @@ config.filter_run_when_matching :focus config.example_status_persistence_file_path = "spec/examples.txt" config.disable_monkey_patching! + config.use_transactional_fixtures = true if config.files_to_run.one? config.default_formatter = "doc" @@ -27,11 +34,6 @@ Kernel.srand config.seed - require File.expand_path('../spec/dummy/config/environment.rb', __dir__) - ENV['RAILS_ROOT'] ||= "#{File.dirname(__FILE__)}../../../spec/dummy" - - require 'rspec/rails' - ActiveRecord::Migration.maintain_test_schema! Retriable.configure do |c|