Skip to content

Commit

Permalink
Merge pull request #284 from alphagov/refac-sync-3
Browse files Browse the repository at this point in the history
Refactor sync: Move locking to its own service
  • Loading branch information
csutter authored Jun 5, 2024
2 parents 06d819f + 7b7af85 commit 6e551ab
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 175 deletions.
72 changes: 72 additions & 0 deletions app/services/coordination/document_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
module Coordination
# Handles locking documents using Redlock to avoid multiple workers attempting to operate on the
# same document simultaneously.
#
# If lock acquisition fails (for example, if Redis is down or the configured number of attempts
# have been exhausted), the error is logged but not propagated. This allows consumers to continue
# their operation even if locking is unavailable, enabling "graceful" degradation at the cost of
# potential rare race conditions (which should not have major impact because when we receive a
# number of messages for a document in quick succession, they usually contain the same content
# anyway).
class DocumentLock
# Redis key prefix for locks and versions
KEY_PREFIX = "search_api_v2:sync_lock".freeze

# Time-to-live for document locks if not explicitly released
TTL = 30.seconds

# Options for Redlock client around handling retries
RETRY_COUNT = 5
RETRY_DELAY = 5.seconds
RETRY_JITTER = 5.seconds

def initialize(content_id)
@content_id = content_id
end

# Attempts to lock the document until unlocked or the lock expires, returns whether or not the
# lock was successfully acquired, and logs any error if not.
def acquire
@lock_info = redlock_client.lock(key, TTL.in_milliseconds)
log_acquire_failure unless @lock_info

!!@lock_info
rescue StandardError => e
log_acquire_failure(e)

false
end

# Releases the lock on the document if it is currently locked by this instance.
def release
return false unless @lock_info

redlock_client.unlock(@lock_info)
@lock_info = nil
end

private

attr_reader :content_id

def key
"#{KEY_PREFIX}:#{content_id}"
end

def redlock_client
@redlock_client ||= Redlock::Client.new(
Rails.configuration.redlock_redis_instances,
retry_count: RETRY_COUNT,
retry_delay: RETRY_DELAY.in_milliseconds,
retry_jitter: RETRY_JITTER.in_milliseconds,
)
end

def log_acquire_failure(error = nil)
Rails.logger.warn(
"[#{self.class.name}] Failed to acquire lock for document: #{content_id}",
)
GovukError.notify(error) if error
end
end
end
30 changes: 16 additions & 14 deletions app/services/discovery_engine/sync/delete.rb
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
module DiscoveryEngine::Sync
class Delete < Operation
def call
with_locked_document do
if outdated_payload_version?
log(
Logger::Severity::INFO,
"Ignored as newer version (#{latest_synced_version}) already synced",
)
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "delete", status: "ignored_outdated"
)
return
end
lock.acquire

client.delete_document(name: document_name)

set_latest_synced_version
if outdated_payload_version?
log(
Logger::Severity::INFO,
"Ignored as newer version (#{latest_synced_version}) already synced",
)
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "delete", status: "ignored_outdated"
)
return
end

client.delete_document(name: document_name)

set_latest_synced_version

log(Logger::Severity::INFO, "Successfully deleted")
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "delete", status: "success"
Expand All @@ -39,6 +39,8 @@ def call
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "delete", status: "error"
)
ensure
lock.release
end
end
end
89 changes: 0 additions & 89 deletions app/services/discovery_engine/sync/locking.rb

This file was deleted.

6 changes: 5 additions & 1 deletion app/services/discovery_engine/sync/operation.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module DiscoveryEngine::Sync
class Operation
include Locking
include Versioning

def initialize(content_id, payload_version: nil, client: nil)
@content_id = content_id
Expand All @@ -12,6 +12,10 @@ def initialize(content_id, payload_version: nil, client: nil)

attr_reader :content_id, :payload_version, :client

def lock
@lock ||= Coordination::DocumentLock.new(content_id)
end

def document_name
"#{Rails.configuration.discovery_engine_datastore_branch}/documents/#{content_id}"
end
Expand Down
54 changes: 28 additions & 26 deletions app/services/discovery_engine/sync/put.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,35 @@ def initialize(content_id, metadata = nil, content: "", payload_version: nil, cl
end

def call
with_locked_document do
if outdated_payload_version?
log(
Logger::Severity::INFO,
"Ignored as newer version (#{latest_synced_version}) already synced",
)
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "put", status: "ignored_outdated"
)
return
end

client.update_document(
document: {
id: content_id,
name: document_name,
json_data: metadata.merge(payload_version:).to_json,
content: {
mime_type: MIME_TYPE,
# The Google client expects an IO object to extract raw byte content from
raw_bytes: StringIO.new(content),
},
},
allow_missing: true,
)
lock.acquire

set_latest_synced_version
if outdated_payload_version?
log(
Logger::Severity::INFO,
"Ignored as newer version (#{latest_synced_version}) already synced",
)
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "put", status: "ignored_outdated"
)
return
end

client.update_document(
document: {
id: content_id,
name: document_name,
json_data: metadata.merge(payload_version:).to_json,
content: {
mime_type: MIME_TYPE,
# The Google client expects an IO object to extract raw byte content from
raw_bytes: StringIO.new(content),
},
},
allow_missing: true,
)

set_latest_synced_version

log(Logger::Severity::INFO, "Successfully added/updated")
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "put", status: "success"
Expand All @@ -50,6 +50,8 @@ def call
)
GovukError.notify(e)
Metrics::Exported.increment_counter(:discovery_engine_requests, type: "put", status: "error")
ensure
lock.release
end

private
Expand Down
32 changes: 32 additions & 0 deletions app/services/discovery_engine/sync/versioning.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module DiscoveryEngine::Sync
module Versioning
# Redis key prefix for versions
VERSION_KEY_PREFIX = "search_api_v2:latest_synced_version".freeze

def outdated_payload_version?
# Sense check: This shouldn't ever come through as nil from Publishing API, but if it does,
# the only really useful thing we can do is ignore this check entirely because we can't
# meaningfully make a comparison.
return false if payload_version.nil?

# If there is no remote version yet, our version is always newer by definition
return false if latest_synced_version.nil?

latest_synced_version.to_i >= payload_version.to_i
end

# Gets the latest synced version for a document from Redis
def latest_synced_version
Rails.application.config.redis_pool.with do |redis|
redis.get("#{VERSION_KEY_PREFIX}:#{content_id}")&.to_i
end
end

# Sets the latest synced version for a document in Redis
def set_latest_synced_version
Rails.application.config.redis_pool.with do |redis|
redis.set("#{VERSION_KEY_PREFIX}:#{content_id}", payload_version)
end
end
end
end
Loading

0 comments on commit 6e551ab

Please sign in to comment.