diff --git a/app/services/coordination/document_lock.rb b/app/services/coordination/document_lock.rb new file mode 100644 index 0000000..0b927f5 --- /dev/null +++ b/app/services/coordination/document_lock.rb @@ -0,0 +1,72 @@ +module Coordination + # Handles locking documents using Redlock to avoid multiple workers attempting to operate on the + # same document simultaneously. + # + # If lock acquisition fails (for example, if Redis is down or the configured number of attempts + # have been exhausted), the error is logged but not propagated. This allows consumers to continue + # their operation even if locking is unavailable, enabling "graceful" degradation at the cost of + # potential rare race conditions (which should not have major impact because when we receive a + # number of messages for a document in quick succession, they usually contain the same content + # anyway). + class DocumentLock + # Redis key prefix for locks and versions + KEY_PREFIX = "search_api_v2:sync_lock".freeze + + # Time-to-live for document locks if not explicitly released + TTL = 30.seconds + + # Options for Redlock client around handling retries + RETRY_COUNT = 5 + RETRY_DELAY = 5.seconds + RETRY_JITTER = 5.seconds + + def initialize(content_id) + @content_id = content_id + end + + # Attempts to lock the document until unlocked or the lock expires, returns whether or not the + # lock was successfully acquired, and logs any error if not. + def acquire + @lock_info = redlock_client.lock(key, TTL.in_milliseconds) + log_acquire_failure unless @lock_info + + !!@lock_info + rescue StandardError => e + log_acquire_failure(e) + + false + end + + # Releases the lock on the document if it is currently locked by this instance. + def release + return false unless @lock_info + + redlock_client.unlock(@lock_info) + @lock_info = nil + end + + private + + attr_reader :content_id + + def key + "#{KEY_PREFIX}:#{content_id}" + end + + def redlock_client + @redlock_client ||= Redlock::Client.new( + Rails.configuration.redlock_redis_instances, + retry_count: RETRY_COUNT, + retry_delay: RETRY_DELAY.in_milliseconds, + retry_jitter: RETRY_JITTER.in_milliseconds, + ) + end + + def log_acquire_failure(error = nil) + Rails.logger.warn( + "[#{self.class.name}] Failed to acquire lock for document: #{content_id}", + ) + GovukError.notify(error) if error + end + end +end diff --git a/app/services/discovery_engine/sync/delete.rb b/app/services/discovery_engine/sync/delete.rb index ac1a6c3..ffa15f0 100644 --- a/app/services/discovery_engine/sync/delete.rb +++ b/app/services/discovery_engine/sync/delete.rb @@ -1,23 +1,23 @@ module DiscoveryEngine::Sync class Delete < Operation def call - with_locked_document do - if outdated_payload_version? - log( - Logger::Severity::INFO, - "Ignored as newer version (#{latest_synced_version}) already synced", - ) - Metrics::Exported.increment_counter( - :discovery_engine_requests, type: "delete", status: "ignored_outdated" - ) - return - end + lock.acquire - client.delete_document(name: document_name) - - set_latest_synced_version + if outdated_payload_version? + log( + Logger::Severity::INFO, + "Ignored as newer version (#{latest_synced_version}) already synced", + ) + Metrics::Exported.increment_counter( + :discovery_engine_requests, type: "delete", status: "ignored_outdated" + ) + return end + client.delete_document(name: document_name) + + set_latest_synced_version + log(Logger::Severity::INFO, "Successfully deleted") Metrics::Exported.increment_counter( :discovery_engine_requests, type: "delete", status: "success" @@ -39,6 +39,8 @@ def call Metrics::Exported.increment_counter( :discovery_engine_requests, type: "delete", status: "error" ) + ensure + lock.release end end end diff --git a/app/services/discovery_engine/sync/locking.rb b/app/services/discovery_engine/sync/locking.rb deleted file mode 100644 index d3e64af..0000000 --- a/app/services/discovery_engine/sync/locking.rb +++ /dev/null @@ -1,89 +0,0 @@ -module DiscoveryEngine::Sync - # Mixin providing a mechanism for sync operations (`Put`, `Delete`) to lock documents while they - # are being operated on so that other workers cannot simultaneously update the same document. - module Locking - # Redis key prefix for locks and versions - LOCK_KEY_PREFIX = "search_api_v2:sync_lock".freeze - VERSION_KEY_PREFIX = "search_api_v2:latest_synced_version".freeze - - # Time-to-live for document locks if not explicitly released - DOCUMENT_LOCK_TTL = 30.seconds.in_milliseconds - - # Options for Redlock client around handling retries - RETRY_COUNT = 10 - RETRY_DELAY = 5.seconds.in_milliseconds - RETRY_JITTER = 5.seconds.in_milliseconds - - # Locks a document while a critical section block is executed to avoid multiple workers - # competing to update the same document. - def with_locked_document(&critical_section) - redlock_client.lock!( - "#{LOCK_KEY_PREFIX}:#{content_id}", - DOCUMENT_LOCK_TTL, - ) do - Rails.logger.add( - Logger::Severity::INFO, - "[#{self.class.name}] Lock acquired for document: #{content_id}, " \ - "payload_version: #{payload_version}", - ) - - critical_section.call - - Rails.logger.add( - Logger::Severity::INFO, - "[#{self.class.name}] Releasing lock for document: #{content_id}, " \ - "payload_version: #{payload_version}", - ) - end - rescue Redlock::LockError => e - # This should be a very rare occurrence (for example, if/when Redis is down). Our "least - # worst" fallback option is to perform the action anyway without the lock (which is the - # previous behaviour from before we had this locking mechanism). - Rails.logger.add( - Logger::Severity::ERROR, - "[#{self.class.name}] Failed to acquire lock for document: #{content_id}, " \ - "payload_version: #{payload_version}. Continuing without lock.", - ) - GovukError.notify(e) - - critical_section.call - end - - def outdated_payload_version? - # Sense check: This shouldn't ever come through as nil from Publishing API, but if it does, - # the only really useful thing we can do is ignore this check entirely because we can't - # meaningfully make a comparison. - return false if payload_version.nil? - - # If there is no remote version yet, our version is always newer by definition - return false if latest_synced_version.nil? - - latest_synced_version.to_i >= payload_version.to_i - end - - # Gets the latest synced version for a document from Redis - def latest_synced_version - Rails.application.config.redis_pool.with do |redis| - redis.get("#{VERSION_KEY_PREFIX}:#{content_id}")&.to_i - end - end - - # Sets the latest synced version for a document in Redis - def set_latest_synced_version - Rails.application.config.redis_pool.with do |redis| - redis.set("#{VERSION_KEY_PREFIX}:#{content_id}", payload_version) - end - end - - private - - def redlock_client - @redlock_client ||= Redlock::Client.new( - Rails.configuration.redlock_redis_instances, - retry_count: RETRY_COUNT, - retry_delay: RETRY_DELAY, - retry_jitter: RETRY_JITTER, - ) - end - end -end diff --git a/app/services/discovery_engine/sync/operation.rb b/app/services/discovery_engine/sync/operation.rb index d798308..4a5edb9 100644 --- a/app/services/discovery_engine/sync/operation.rb +++ b/app/services/discovery_engine/sync/operation.rb @@ -1,6 +1,6 @@ module DiscoveryEngine::Sync class Operation - include Locking + include Versioning def initialize(content_id, payload_version: nil, client: nil) @content_id = content_id @@ -12,6 +12,10 @@ def initialize(content_id, payload_version: nil, client: nil) attr_reader :content_id, :payload_version, :client + def lock + @lock ||= Coordination::DocumentLock.new(content_id) + end + def document_name "#{Rails.configuration.discovery_engine_datastore_branch}/documents/#{content_id}" end diff --git a/app/services/discovery_engine/sync/put.rb b/app/services/discovery_engine/sync/put.rb index 157ca17..28952b8 100644 --- a/app/services/discovery_engine/sync/put.rb +++ b/app/services/discovery_engine/sync/put.rb @@ -10,35 +10,35 @@ def initialize(content_id, metadata = nil, content: "", payload_version: nil, cl end def call - with_locked_document do - if outdated_payload_version? - log( - Logger::Severity::INFO, - "Ignored as newer version (#{latest_synced_version}) already synced", - ) - Metrics::Exported.increment_counter( - :discovery_engine_requests, type: "put", status: "ignored_outdated" - ) - return - end - - client.update_document( - document: { - id: content_id, - name: document_name, - json_data: metadata.merge(payload_version:).to_json, - content: { - mime_type: MIME_TYPE, - # The Google client expects an IO object to extract raw byte content from - raw_bytes: StringIO.new(content), - }, - }, - allow_missing: true, - ) + lock.acquire - set_latest_synced_version + if outdated_payload_version? + log( + Logger::Severity::INFO, + "Ignored as newer version (#{latest_synced_version}) already synced", + ) + Metrics::Exported.increment_counter( + :discovery_engine_requests, type: "put", status: "ignored_outdated" + ) + return end + client.update_document( + document: { + id: content_id, + name: document_name, + json_data: metadata.merge(payload_version:).to_json, + content: { + mime_type: MIME_TYPE, + # The Google client expects an IO object to extract raw byte content from + raw_bytes: StringIO.new(content), + }, + }, + allow_missing: true, + ) + + set_latest_synced_version + log(Logger::Severity::INFO, "Successfully added/updated") Metrics::Exported.increment_counter( :discovery_engine_requests, type: "put", status: "success" @@ -50,6 +50,8 @@ def call ) GovukError.notify(e) Metrics::Exported.increment_counter(:discovery_engine_requests, type: "put", status: "error") + ensure + lock.release end private diff --git a/app/services/discovery_engine/sync/versioning.rb b/app/services/discovery_engine/sync/versioning.rb new file mode 100644 index 0000000..7e04d9e --- /dev/null +++ b/app/services/discovery_engine/sync/versioning.rb @@ -0,0 +1,32 @@ +module DiscoveryEngine::Sync + module Versioning + # Redis key prefix for versions + VERSION_KEY_PREFIX = "search_api_v2:latest_synced_version".freeze + + def outdated_payload_version? + # Sense check: This shouldn't ever come through as nil from Publishing API, but if it does, + # the only really useful thing we can do is ignore this check entirely because we can't + # meaningfully make a comparison. + return false if payload_version.nil? + + # If there is no remote version yet, our version is always newer by definition + return false if latest_synced_version.nil? + + latest_synced_version.to_i >= payload_version.to_i + end + + # Gets the latest synced version for a document from Redis + def latest_synced_version + Rails.application.config.redis_pool.with do |redis| + redis.get("#{VERSION_KEY_PREFIX}:#{content_id}")&.to_i + end + end + + # Sets the latest synced version for a document in Redis + def set_latest_synced_version + Rails.application.config.redis_pool.with do |redis| + redis.set("#{VERSION_KEY_PREFIX}:#{content_id}", payload_version) + end + end + end +end diff --git a/spec/services/coordination/document_lock_spec.rb b/spec/services/coordination/document_lock_spec.rb new file mode 100644 index 0000000..558a1ff --- /dev/null +++ b/spec/services/coordination/document_lock_spec.rb @@ -0,0 +1,95 @@ +RSpec.describe Coordination::DocumentLock do + subject(:document_lock) { described_class.new("content-id") } + + let(:redlock_client) { instance_double(Redlock::Client) } + + before do + allow(Redlock::Client).to receive(:new).and_return(redlock_client) + end + + describe "#acquire" do + context "when locking succeeds" do + before do + allow(redlock_client).to receive(:lock).and_return({ lock: :info }) + end + + it "returns true" do + expect(document_lock.acquire).to be true + end + + it "acquires a lock from the Redlock client for 30 seconds" do + document_lock.acquire + + expect(redlock_client).to have_received(:lock).with("search_api_v2:sync_lock:content-id", 30_000) + end + end + + context "when locking fails" do + before do + allow(redlock_client).to receive(:lock).and_return(false) + allow(Rails.logger).to receive(:warn) + end + + it "returns false" do + expect(document_lock.acquire).to be false + end + + it "logs an error" do + document_lock.acquire + + expect(Rails.logger).to have_received(:warn) + .with("[Coordination::DocumentLock] Failed to acquire lock for document: content-id") + end + end + + context "when locking raises an error" do + let(:error) { StandardError.new("uh oh") } + + before do + allow(redlock_client).to receive(:lock).and_raise(error) + allow(Rails.logger).to receive(:warn) + allow(GovukError).to receive(:notify) + end + + it "returns false" do + expect(document_lock.acquire).to be false + end + + it "logs an error" do + document_lock.acquire + + expect(Rails.logger).to have_received(:warn) + .with("[Coordination::DocumentLock] Failed to acquire lock for document: content-id") + end + + it "sends the error to GovukError" do + document_lock.acquire + + expect(GovukError).to have_received(:notify).with(error) + end + end + end + + describe "#unlock" do + context "when the document is not locked" do + it "returns false" do + expect(document_lock.release).to be false + end + end + + context "when the document is locked" do + before do + allow(redlock_client).to receive(:lock).and_return({ lock: :info }) + allow(redlock_client).to receive(:unlock) + + document_lock.acquire + end + + it "releases the lock" do + document_lock.release + + expect(redlock_client).to have_received(:unlock).with({ lock: :info }) + end + end + end +end diff --git a/spec/services/discovery_engine/sync/delete_spec.rb b/spec/services/discovery_engine/sync/delete_spec.rb index 2623a20..43602d0 100644 --- a/spec/services/discovery_engine/sync/delete_spec.rb +++ b/spec/services/discovery_engine/sync/delete_spec.rb @@ -1,7 +1,7 @@ RSpec.describe DiscoveryEngine::Sync::Delete do let(:client) { double("DocumentService::Client", delete_document: nil) } let(:logger) { double("Logger", add: nil) } - let(:redlock_client) { instance_double(Redlock::Client) } + let(:lock) { instance_double(Coordination::DocumentLock, acquire: true, release: true) } let(:redis_client) { instance_double(Redis, get: "0", set: nil) } before do @@ -9,11 +9,7 @@ allow(Rails.configuration).to receive(:discovery_engine_datastore_branch).and_return("branch") allow(GovukError).to receive(:notify) - allow(Redlock::Client).to receive(:new).and_return(redlock_client) - allow(redlock_client).to receive(:lock!) - .with("search_api_v2:sync_lock:some_content_id", anything) - .and_yield - + allow(Coordination::DocumentLock).to receive(:new).with("some_content_id").and_return(lock) allow(Rails.application.config.redis_pool).to receive(:with).and_yield(redis_client) end @@ -40,6 +36,11 @@ "[DiscoveryEngine::Sync::Delete] Successfully deleted content_id:some_content_id payload_version:1", ) end + + it "acquires and releases the lock" do + expect(lock).to have_received(:acquire) + expect(lock).to have_received(:release) + end end context "when the incoming document is older than the remote version" do @@ -95,10 +96,8 @@ end context "when locking the document fails" do - let(:error) { Redlock::LockError.new("resource") } - before do - allow(redlock_client).to receive(:lock!).and_raise(error) + allow(lock).to receive(:acquire).and_return(false) described_class.new("some_content_id", payload_version: "1", client:).call end @@ -106,17 +105,6 @@ it "deletes the document regardless" do expect(client).to have_received(:delete_document) end - - it "logs the failure" do - expect(logger).to have_received(:add).with( - Logger::Severity::ERROR, - "[DiscoveryEngine::Sync::Delete] Failed to acquire lock for document: some_content_id, payload_version: 1. Continuing without lock.", - ) - end - - it "sends the error to Sentry" do - expect(GovukError).to have_received(:notify).with(error) - end end context "when the delete fails because the document doesn't exist" do diff --git a/spec/services/discovery_engine/sync/put_spec.rb b/spec/services/discovery_engine/sync/put_spec.rb index 626602c..b203a65 100644 --- a/spec/services/discovery_engine/sync/put_spec.rb +++ b/spec/services/discovery_engine/sync/put_spec.rb @@ -1,7 +1,7 @@ RSpec.describe DiscoveryEngine::Sync::Put do let(:client) { double("DocumentService::Client", update_document: nil) } let(:logger) { double("Logger", add: nil) } - let(:redlock_client) { instance_double(Redlock::Client) } + let(:lock) { instance_double(Coordination::DocumentLock, acquire: true, release: true) } let(:redis_client) { instance_double(Redis, get: "0", set: nil) } before do @@ -9,10 +9,7 @@ allow(Rails.configuration).to receive(:discovery_engine_datastore_branch).and_return("branch") allow(GovukError).to receive(:notify) - allow(Redlock::Client).to receive(:new).and_return(redlock_client) - allow(redlock_client).to receive(:lock!) - .with("search_api_v2:sync_lock:some_content_id", anything) - .and_yield + allow(Coordination::DocumentLock).to receive(:new).with("some_content_id").and_return(lock) allow(Rails.application.config.redis_pool).to receive(:with).and_yield(redis_client) end @@ -47,11 +44,9 @@ ) end - it "locks the document by content_id during the update call" do - expect(redlock_client).to have_received(:lock!).with( - "search_api_v2:sync_lock:some_content_id", - anything, - ) + it "acquires and releases the lock" do + expect(lock).to have_received(:acquire) + expect(lock).to have_received(:release) end it "sets the new latest remote version" do @@ -133,10 +128,8 @@ end context "when locking the document fails" do - let(:error) { Redlock::LockError.new("resource") } - before do - allow(redlock_client).to receive(:lock!).and_raise(error) + allow(lock).to receive(:acquire).and_return(false) described_class.new( "some_content_id", @@ -150,17 +143,6 @@ it "updates the document regardless" do expect(client).to have_received(:update_document) end - - it "logs the failure" do - expect(logger).to have_received(:add).with( - Logger::Severity::ERROR, - "[DiscoveryEngine::Sync::Put] Failed to acquire lock for document: some_content_id, payload_version: 1. Continuing without lock.", - ) - end - - it "sends the error to Sentry" do - expect(GovukError).to have_received(:notify).with(error) - end end context "when updating the document fails" do diff --git a/spec/services/discovery_engine/sync/locking_spec.rb b/spec/services/discovery_engine/sync/versioning_spec.rb similarity index 96% rename from spec/services/discovery_engine/sync/locking_spec.rb rename to spec/services/discovery_engine/sync/versioning_spec.rb index 3230ca3..b004988 100644 --- a/spec/services/discovery_engine/sync/locking_spec.rb +++ b/spec/services/discovery_engine/sync/versioning_spec.rb @@ -1,4 +1,4 @@ -RSpec.describe DiscoveryEngine::Sync::Locking do +RSpec.describe DiscoveryEngine::Sync::Versioning do subject(:lockable) { DiscoveryEngine::Sync::Operation.new(content_id, payload_version:, client:) } let(:content_id) { "some-content-id" }