Skip to content

Commit

Permalink
Merge pull request #291 from alphagov/refac-sync-4
Browse files Browse the repository at this point in the history
Refactor versioning into service class
  • Loading branch information
csutter authored Jun 20, 2024
2 parents 5d84cae + d464e58 commit 583deb5
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 157 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
module DiscoveryEngine::Sync
module Versioning
module Coordination
# Keeps track of the latest version of a document that has been synced. This allows us to avoid
# race conditions where an older document version is processed after a newer one.
class DocumentVersionCache
# Redis key prefix for versions
VERSION_KEY_PREFIX = "search_api_v2:latest_synced_version".freeze

def outdated_payload_version?
def initialize(content_id, payload_version:)
@content_id = content_id
@payload_version = payload_version
end

# Checks whether this document version is outdated (because the cache tracks a newer version).
def outdated?
# Sense check: This shouldn't ever come through as nil from Publishing API, but if it does,
# the only really useful thing we can do is ignore this check entirely because we can't
# meaningfully make a comparison.
Expand All @@ -15,17 +23,24 @@ def outdated_payload_version?
latest_synced_version.to_i >= payload_version.to_i
end

# Gets the latest synced version for a document from Redis
def latest_synced_version
# Sets the latest synced version to the current payload version, for example after a successful
# sync operation.
#
# Note that this method should only be called when holding a lock on the document through
# `DocumentLock` as it does not guarantee any locking of its own.
def set_as_latest_synced_version
Rails.application.config.redis_pool.with do |redis|
redis.get("#{VERSION_KEY_PREFIX}:#{content_id}")&.to_i
redis.set("#{VERSION_KEY_PREFIX}:#{content_id}", payload_version)
end
end

# Sets the latest synced version for a document in Redis
def set_latest_synced_version
private

attr_reader :content_id, :payload_version

def latest_synced_version
Rails.application.config.redis_pool.with do |redis|
redis.set("#{VERSION_KEY_PREFIX}:#{content_id}", payload_version)
redis.get("#{VERSION_KEY_PREFIX}:#{content_id}")&.to_i
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions app/services/discovery_engine/sync/delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ class Delete < Operation
def call
lock.acquire

if outdated_payload_version?
if version_cache.outdated?
log(
Logger::Severity::INFO,
"Ignored as newer version (#{latest_synced_version}) already synced",
"Ignored as newer version already synced",
)
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "delete", status: "ignored_outdated"
Expand All @@ -16,7 +16,7 @@ def call

client.delete_document(name: document_name)

set_latest_synced_version
version_cache.set_as_latest_synced_version

log(Logger::Severity::INFO, "Successfully deleted")
Metrics::Exported.increment_counter(
Expand Down
6 changes: 4 additions & 2 deletions app/services/discovery_engine/sync/operation.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module DiscoveryEngine::Sync
class Operation
include Versioning

def initialize(content_id, payload_version: nil, client: nil)
@content_id = content_id
@payload_version = payload_version
Expand All @@ -16,6 +14,10 @@ def lock
@lock ||= Coordination::DocumentLock.new(content_id)
end

def version_cache
@version_cache ||= Coordination::DocumentVersionCache.new(content_id, payload_version:)
end

def document_name
"#{Rails.configuration.discovery_engine_datastore_branch}/documents/#{content_id}"
end
Expand Down
6 changes: 3 additions & 3 deletions app/services/discovery_engine/sync/put.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ def initialize(content_id, metadata = nil, content: "", payload_version: nil, cl
def call
lock.acquire

if outdated_payload_version?
if version_cache.outdated?
log(
Logger::Severity::INFO,
"Ignored as newer version (#{latest_synced_version}) already synced",
"Ignored as newer version already synced",
)
Metrics::Exported.increment_counter(
:discovery_engine_requests, type: "put", status: "ignored_outdated"
Expand All @@ -37,7 +37,7 @@ def call
allow_missing: true,
)

set_latest_synced_version
version_cache.set_as_latest_synced_version

log(Logger::Severity::INFO, "Successfully added/updated")
Metrics::Exported.increment_counter(
Expand Down
57 changes: 57 additions & 0 deletions spec/services/coordination/document_version_cache_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
RSpec.describe Coordination::DocumentVersionCache do
subject(:document_version_cache) { described_class.new("content-id", payload_version:) }

let(:payload_version) { 1234 }
let(:remote_version) { nil }

let(:redis_client) { instance_double(Redis, get: nil, set: nil) }

before do
allow(Rails.application.config.redis_pool).to receive(:with).and_yield(redis_client)
allow(redis_client).to receive(:get)
.with("search_api_v2:latest_synced_version:content-id").and_return(remote_version)
end

describe "outdated?" do
subject(:outdated) { document_version_cache.outdated? }

context "when the remote version is newer" do
let(:remote_version) { payload_version + 1 }

it { is_expected.to be true }
end

context "when the remote version is the same" do
let(:remote_version) { payload_version }

it { is_expected.to be true }
end

context "when the remote version is older" do
let(:remote_version) { payload_version - 1 }

it { is_expected.to be false }
end

context "when there is no remote version" do
let(:remote_version) { nil }

it { is_expected.to be false }
end

context "when there is no payload version" do
let(:payload_version) { nil }

it { is_expected.to be false }
end
end

describe "set_as_latest_synced_version" do
it "sets the latest synced version" do
document_version_cache.set_as_latest_synced_version

expect(redis_client).to have_received(:set)
.with("search_api_v2:latest_synced_version:content-id", 1234)
end
end
end
52 changes: 12 additions & 40 deletions spec/services/discovery_engine/sync/delete_spec.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
RSpec.describe DiscoveryEngine::Sync::Delete do
let(:client) { double("DocumentService::Client", delete_document: nil) }
let(:logger) { double("Logger", add: nil) }

let(:lock) { instance_double(Coordination::DocumentLock, acquire: true, release: true) }
let(:redis_client) { instance_double(Redis, get: "0", set: nil) }

let(:version_cache) { instance_double(Coordination::DocumentVersionCache, outdated?: outdated, set_as_latest_synced_version: nil) }
let(:outdated) { false }

before do
allow(Rails).to receive(:logger).and_return(logger)
allow(Rails.configuration).to receive(:discovery_engine_datastore_branch).and_return("branch")
allow(GovukError).to receive(:notify)

allow(Coordination::DocumentLock).to receive(:new).with("some_content_id").and_return(lock)
allow(Rails.application.config.redis_pool).to receive(:with).and_yield(redis_client)
allow(Coordination::DocumentVersionCache).to receive(:new)
.with("some_content_id", payload_version: "1").and_return(version_cache)
end

context "when the delete succeeds" do
Expand All @@ -24,10 +28,7 @@
end

it "sets the new latest remote version" do
expect(redis_client).to have_received(:set).with(
"search_api_v2:latest_synced_version:some_content_id",
"1",
)
expect(version_cache).to have_received(:set_as_latest_synced_version)
end

it "logs the delete operation" do
Expand All @@ -43,11 +44,10 @@
end
end

context "when the incoming document is older than the remote version" do
before do
allow(redis_client).to receive(:get)
.with("search_api_v2:latest_synced_version:some_content_id").and_return("42")
context "when the incoming document is outdated" do
let(:outdated) { true }

before do
described_class.new("some_content_id", payload_version: "1", client:).call
end

Expand All @@ -56,41 +56,13 @@
end

it "does not set the remote version" do
expect(redis_client).not_to have_received(:set)
expect(version_cache).not_to have_received(:set_as_latest_synced_version)
end

it "logs that the document hasn't been deleted" do
expect(logger).to have_received(:add).with(
Logger::Severity::INFO,
"[DiscoveryEngine::Sync::Delete] Ignored as newer version (42) already synced content_id:some_content_id payload_version:1",
)
end
end

context "when there is no remote version yet" do
before do
allow(redis_client).to receive(:get)
.with("search_api_v2:latest_synced_version:some_content_id").and_return(nil)

described_class.new("some_content_id", payload_version: "1", client:).call
end

it "deletes the document" do
expect(client).to have_received(:delete_document)
.with(name: "branch/documents/some_content_id")
end

it "sets the new latest remote version" do
expect(redis_client).to have_received(:set).with(
"search_api_v2:latest_synced_version:some_content_id",
"1",
)
end

it "logs the delete operation" do
expect(logger).to have_received(:add).with(
Logger::Severity::INFO,
"[DiscoveryEngine::Sync::Delete] Successfully deleted content_id:some_content_id payload_version:1",
"[DiscoveryEngine::Sync::Delete] Ignored as newer version already synced content_id:some_content_id payload_version:1",
)
end
end
Expand Down
56 changes: 11 additions & 45 deletions spec/services/discovery_engine/sync/put_spec.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
RSpec.describe DiscoveryEngine::Sync::Put do
let(:client) { double("DocumentService::Client", update_document: nil) }
let(:logger) { double("Logger", add: nil) }

let(:lock) { instance_double(Coordination::DocumentLock, acquire: true, release: true) }
let(:redis_client) { instance_double(Redis, get: "0", set: nil) }
let(:version_cache) { instance_double(Coordination::DocumentVersionCache, outdated?: outdated, set_as_latest_synced_version: nil) }
let(:outdated) { false }

before do
allow(Rails).to receive(:logger).and_return(logger)
Expand All @@ -11,7 +13,8 @@

allow(Coordination::DocumentLock).to receive(:new).with("some_content_id").and_return(lock)

allow(Rails.application.config.redis_pool).to receive(:with).and_yield(redis_client)
allow(Coordination::DocumentVersionCache).to receive(:new)
.with("some_content_id", payload_version: "1").and_return(version_cache)
end

context "when updating the document succeeds" do
Expand Down Expand Up @@ -50,10 +53,7 @@
end

it "sets the new latest remote version" do
expect(redis_client).to have_received(:set).with(
"search_api_v2:latest_synced_version:some_content_id",
"1",
)
expect(version_cache).to have_received(:set_as_latest_synced_version)
end

it "logs the put operation" do
Expand All @@ -64,11 +64,10 @@
end
end

context "when the incoming document is older than the remote version" do
before do
allow(redis_client).to receive(:get)
.with("search_api_v2:latest_synced_version:some_content_id").and_return("42")
context "when the incoming document is outdated" do
let(:outdated) { true }

before do
described_class.new(
"some_content_id",
{ foo: "bar" },
Expand All @@ -83,46 +82,13 @@
end

it "does not set the remote version" do
expect(redis_client).not_to have_received(:set)
expect(version_cache).not_to have_received(:set_as_latest_synced_version)
end

it "logs that the document hasn't been updated" do
expect(logger).to have_received(:add).with(
Logger::Severity::INFO,
"[DiscoveryEngine::Sync::Put] Ignored as newer version (42) already synced content_id:some_content_id payload_version:1",
)
end
end

context "when there is no remote version yet" do
before do
allow(redis_client).to receive(:get)
.with("search_api_v2:latest_synced_version:some_content_id").and_return(nil)

described_class.new(
"some_content_id",
{ foo: "bar" },
content: "some content",
payload_version: "1",
client:,
).call
end

it "updates the document" do
expect(client).to have_received(:update_document)
end

it "sets the new latest remote version" do
expect(redis_client).to have_received(:set).with(
"search_api_v2:latest_synced_version:some_content_id",
"1",
)
end

it "logs the put operation" do
expect(logger).to have_received(:add).with(
Logger::Severity::INFO,
"[DiscoveryEngine::Sync::Put] Successfully added/updated content_id:some_content_id payload_version:1",
"[DiscoveryEngine::Sync::Put] Ignored as newer version already synced content_id:some_content_id payload_version:1",
)
end
end
Expand Down
Loading

0 comments on commit 583deb5

Please sign in to comment.