From d25876582646068db7518aa6d2c7c38257e0d0a5 Mon Sep 17 00:00:00 2001 From: Martyn Lloyd-Kelly Date: Thu, 14 Sep 2023 10:19:51 +0100 Subject: [PATCH] Add basic ability to consume publishing queue - Add `govuk_message_queue_consumer` gem - Add `PublishedDocumentsQueueConsumer` to process incoming messages from publishing queue (and simply log payload for now) - Add Rake task to run the consumer against the queue - Add Rake task to create queue in development environment --- Gemfile | 1 + Gemfile.lock | 16 ++++++++++++ .../published_documents_queue_consumer.rb | 6 +++++ lib/tasks/message_queue.rake | 21 ++++++++++++++++ ...published_documents_queue_consumer_spec.rb | 25 +++++++++++++++++++ 5 files changed, 69 insertions(+) create mode 100644 app/queue_consumers/published_documents_queue_consumer.rb create mode 100644 lib/tasks/message_queue.rake create mode 100644 spec/queue_consumers/published_documents_queue_consumer_spec.rb diff --git a/Gemfile b/Gemfile index 2a4388d..6cf381a 100644 --- a/Gemfile +++ b/Gemfile @@ -11,6 +11,7 @@ gem "railties", RAILS_GEMS_VERSION gem "bootsnap", require: false gem "govuk_app_config" +gem "govuk_message_queue_consumer" group :test do gem "simplecov", require: false diff --git a/Gemfile.lock b/Gemfile.lock index 2526fb2..d1f2ce4 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -23,11 +23,15 @@ GEM tzinfo (~> 2.0) addressable (2.8.5) public_suffix (>= 2.0.2, < 6.0) + amq-protocol (2.3.2) ast (2.4.2) bootsnap (1.16.0) msgpack (~> 1.2) brakeman (6.0.1) builder (3.2.4) + bunny (2.22.0) + amq-protocol (~> 2.3, >= 2.3.1) + sorted_set (~> 1, >= 1.0.2) capybara (3.39.2) addressable matrix @@ -46,6 +50,7 @@ GEM docile (1.4.0) erubi (1.12.0) google-protobuf (3.24.2-aarch64-linux) + google-protobuf (3.24.2-x86_64-darwin) google-protobuf (3.24.2-x86_64-linux) googleapis-common-protos-types (1.8.0) google-protobuf (~> 3.18) @@ -61,6 +66,8 @@ GEM sentry-rails (~> 5.3) sentry-ruby (~> 5.3) statsd-ruby (~> 1.5) + govuk_message_queue_consumer (4.1.0) + bunny (~> 2.17) govuk_test (3.0.1) brakeman (>= 5.0.2) capybara (>= 3.36) @@ -87,6 +94,8 @@ GEM nio4r (2.5.9) nokogiri (1.15.4-aarch64-linux) racc (~> 1.4) + nokogiri (1.15.4-x86_64-darwin) + racc (~> 1.4) nokogiri (1.15.4-x86_64-linux) racc (~> 1.4) opentelemetry-api (1.2.2) @@ -311,6 +320,7 @@ GEM zeitwerk (~> 2.5) rainbow (3.1.1) rake (13.0.6) + rbtree (0.4.6) regexp_parser (2.8.1) reline (0.3.8) io-console (~> 0.5) @@ -379,12 +389,16 @@ GEM sentry-ruby (~> 5.10.0) sentry-ruby (5.10.0) concurrent-ruby (~> 1.0, >= 1.0.2) + set (1.0.3) simplecov (0.22.0) docile (~> 1.1) simplecov-html (~> 0.11) simplecov_json_formatter (~> 0.1) simplecov-html (0.12.3) simplecov_json_formatter (0.1.4) + sorted_set (1.0.3) + rbtree + set (~> 1.0) statsd-ruby (1.5.0) thor (1.2.2) tzinfo (2.0.6) @@ -398,6 +412,7 @@ GEM PLATFORMS aarch64-linux + x86_64-darwin-22 x86_64-linux DEPENDENCIES @@ -408,6 +423,7 @@ DEPENDENCIES brakeman debug govuk_app_config + govuk_message_queue_consumer govuk_test railties (= 7.0.7.2) rspec-rails diff --git a/app/queue_consumers/published_documents_queue_consumer.rb b/app/queue_consumers/published_documents_queue_consumer.rb new file mode 100644 index 0000000..cd86317 --- /dev/null +++ b/app/queue_consumers/published_documents_queue_consumer.rb @@ -0,0 +1,6 @@ +class PublishedDocumentsQueueConsumer + def process(message) + Rails.logger.info(message.payload) + message.ack + end +end diff --git a/lib/tasks/message_queue.rake b/lib/tasks/message_queue.rake new file mode 100644 index 0000000..c790dad --- /dev/null +++ b/lib/tasks/message_queue.rake @@ -0,0 +1,21 @@ +namespace :message_queue do + desc "Create RabbitMQ queue for development environment" + task create_queue: :environment do + # The exchange, queue, and binding are created via Terraform outside of local development: + # https://github.com/alphagov/govuk-aws/blob/main/terraform/projects/app-publishing-amazonmq/ + raise "This task should only be run in development" unless Rails.env.development? + + bunny = Bunny.new + channel = bunny.start.create_channel + exch = Bunny::Exchange.new(channel, :topic, "published_documents") + channel.queue("search_api_v2_published_documents").bind(exch, routing_key: "*.*") + end + + desc "Listens to and processes messages from the published documents queue" + task consume_published_documents: :environment do + GovukMessageQueueConsumer::Consumer.new( + queue_name: "search_api_v2_published_documents", + processor: PublishedDocumentsQueueConsumer.new, + ).run + end +end diff --git a/spec/queue_consumers/published_documents_queue_consumer_spec.rb b/spec/queue_consumers/published_documents_queue_consumer_spec.rb new file mode 100644 index 0000000..2a666c4 --- /dev/null +++ b/spec/queue_consumers/published_documents_queue_consumer_spec.rb @@ -0,0 +1,25 @@ +require "govuk_message_queue_consumer/test_helpers" + +RSpec.describe PublishedDocumentsQueueConsumer do + subject(:consumer) { described_class.new } + + it_behaves_like "a message queue processor" + + describe "when receiving an incoming message" do + let(:message) { GovukMessageQueueConsumer::MockMessage.new(payload) } + let(:payload) { { "hello" => "world" } } + + before do + allow(Rails.logger).to receive(:info) + consumer.process(message) + end + + it "acks incoming messages" do + expect(message).to be_acked + end + + it "logs the payload of incoming messages" do + expect(Rails.logger).to have_received(:info).with(payload) + end + end +end