From b413e1f5503b739558f4fd61f3a994ab99deb0fe Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Sat, 4 Feb 2023 19:45:42 +1100 Subject: [PATCH 01/14] chore: re-use same lock for schema and data migrations --- lib/pact_broker/app.rb | 24 ++++++++++++-------- lib/sequel/postgres_advisory_lock.rb | 33 ++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 9 deletions(-) create mode 100644 lib/sequel/postgres_advisory_lock.rb diff --git a/lib/pact_broker/app.rb b/lib/pact_broker/app.rb index 341635492..6f3391560 100644 --- a/lib/pact_broker/app.rb +++ b/lib/pact_broker/app.rb @@ -101,22 +101,28 @@ def post_configure def prepare_database logger.info "Database schema version is #{PactBroker::DB.version(configuration.database_connection)}" + lock = Sequel::PostgresAdvisoryLock.new(configuration.database_connection, :migrate, :pg_advisory_lock) if configuration.auto_migrate_db - migration_options = { allow_missing_migration_files: configuration.allow_missing_migration_files } - if PactBroker::DB.is_current?(configuration.database_connection, migration_options) - logger.info "Skipping database migrations as the latest migration has already been applied" - else - logger.info "Migrating database schema" - PactBroker::DB.run_migrations configuration.database_connection, migration_options - logger.info "Database schema version is now #{PactBroker::DB.version(configuration.database_connection)}" + lock.with_lock do + migration_options = { allow_missing_migration_files: configuration.allow_missing_migration_files } + + if PactBroker::DB.is_current?(configuration.database_connection, migration_options) + logger.info "Skipping database migrations as the latest migration has already been applied" + else + logger.info "Migrating database schema" + PactBroker::DB.run_migrations configuration.database_connection, migration_options + logger.info "Database schema version is now #{PactBroker::DB.version(configuration.database_connection)}" + end end else logger.info "Skipping database schema migrations as database auto migrate is disabled" end if configuration.auto_migrate_db_data - logger.info "Migrating data" - PactBroker::DB.run_data_migrations configuration.database_connection + lock.with_lock do + logger.info "Migrating data" + PactBroker::DB.run_data_migrations configuration.database_connection + end else logger.info "Skipping data migrations" end diff --git a/lib/sequel/postgres_advisory_lock.rb b/lib/sequel/postgres_advisory_lock.rb new file mode 100644 index 000000000..cbec07923 --- /dev/null +++ b/lib/sequel/postgres_advisory_lock.rb @@ -0,0 +1,33 @@ +module Sequel + class PostgresAdvisoryLock + def initialize(database_connection, name, type = :pg_try_advisory_lock) + @database_connection = database_connection + @name = name + @type = type + @lock_obtained = false + end + + def with_lock + if postgres? + @database_connection.extension :pg_advisory_lock + @database_connection.register_advisory_lock(@name, @type) + results = @database_connection.with_advisory_lock(@name) do + @lock_obtained = true + yield + end + results + else + @lock_obtained = true + yield + end + end + + def lock_obtained? + @lock_obtained + end + + def postgres? + @database_connection.adapter_scheme.to_s =~ /postgres/ + end + end +end From 2ce13b7b6fcc862e670dc4e884343ffdf6a7fa7e Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Sat, 4 Feb 2023 19:56:52 +1100 Subject: [PATCH 02/14] chore: only register lock once --- lib/sequel/postgres_advisory_lock.rb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/sequel/postgres_advisory_lock.rb b/lib/sequel/postgres_advisory_lock.rb index cbec07923..290c67049 100644 --- a/lib/sequel/postgres_advisory_lock.rb +++ b/lib/sequel/postgres_advisory_lock.rb @@ -5,17 +5,18 @@ def initialize(database_connection, name, type = :pg_try_advisory_lock) @name = name @type = type @lock_obtained = false + if postgres? + @database_connection.extension :pg_advisory_lock + @database_connection.register_advisory_lock(@name, @type) + end end def with_lock if postgres? - @database_connection.extension :pg_advisory_lock - @database_connection.register_advisory_lock(@name, @type) - results = @database_connection.with_advisory_lock(@name) do + @database_connection.with_advisory_lock(@name) do @lock_obtained = true yield end - results else @lock_obtained = true yield From 743c8bfc3ef2ed0f6dce1208fb0fe9bdf1b5e1a3 Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Sun, 5 Feb 2023 13:51:31 +1100 Subject: [PATCH 03/14] chore: ensure lock is only registered once --- lib/sequel/postgres_advisory_lock.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/sequel/postgres_advisory_lock.rb b/lib/sequel/postgres_advisory_lock.rb index 290c67049..69d3cbb81 100644 --- a/lib/sequel/postgres_advisory_lock.rb +++ b/lib/sequel/postgres_advisory_lock.rb @@ -7,7 +7,9 @@ def initialize(database_connection, name, type = :pg_try_advisory_lock) @lock_obtained = false if postgres? @database_connection.extension :pg_advisory_lock - @database_connection.register_advisory_lock(@name, @type) + unless @database_connection.registered_advisory_locks.key?(@name) + @database_connection.register_advisory_lock(@name, @type) + end end end From 8ae94f060a11f8a917c66ee26f536dca90932ae2 Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Fri, 22 Mar 2024 14:09:50 +1100 Subject: [PATCH 04/14] chore: add missing require --- lib/pact_broker/app.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/pact_broker/app.rb b/lib/pact_broker/app.rb index 6f3391560..8e74ad104 100644 --- a/lib/pact_broker/app.rb +++ b/lib/pact_broker/app.rb @@ -27,6 +27,7 @@ require "pact_broker/api/authorization/resource_access_policy" require "pact_broker/api/middleware/http_debug_logs" require "pact_broker/application_context" +require "sequel/postgres_advisory_lock" module PactBroker From 1f467b83b8b1482de1cb81d8dc956b31edf507b8 Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Fri, 22 Mar 2024 14:46:05 +1100 Subject: [PATCH 05/14] chore: add gem to gemspec --- lib/pact_broker/app.rb | 30 +++++++++++++++++++----------- pact_broker.gemspec | 1 + 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/lib/pact_broker/app.rb b/lib/pact_broker/app.rb index 8e74ad104..b3e241401 100644 --- a/lib/pact_broker/app.rb +++ b/lib/pact_broker/app.rb @@ -105,15 +105,7 @@ def prepare_database lock = Sequel::PostgresAdvisoryLock.new(configuration.database_connection, :migrate, :pg_advisory_lock) if configuration.auto_migrate_db lock.with_lock do - migration_options = { allow_missing_migration_files: configuration.allow_missing_migration_files } - - if PactBroker::DB.is_current?(configuration.database_connection, migration_options) - logger.info "Skipping database migrations as the latest migration has already been applied" - else - logger.info "Migrating database schema" - PactBroker::DB.run_migrations configuration.database_connection, migration_options - logger.info "Database schema version is now #{PactBroker::DB.version(configuration.database_connection)}" - end + ensure_all_database_migrations_are_applied end else logger.info "Skipping database schema migrations as database auto migrate is disabled" @@ -121,8 +113,7 @@ def prepare_database if configuration.auto_migrate_db_data lock.with_lock do - logger.info "Migrating data" - PactBroker::DB.run_data_migrations configuration.database_connection + run_data_migrations end else logger.info "Skipping data migrations" @@ -132,6 +123,23 @@ def prepare_database PactBroker::Webhooks::Service.fail_retrying_triggered_webhooks end + def ensure_all_database_migrations_are_applied + migration_options = { allow_missing_migration_files: configuration.allow_missing_migration_files } + + if PactBroker::DB.is_current?(configuration.database_connection, migration_options) + logger.info "Skipping database migrations as the latest migration has already been applied" + else + logger.info "Migrating database schema" + PactBroker::DB.run_migrations(configuration.database_connection, migration_options) + logger.info "Database schema version is now #{PactBroker::DB.version(configuration.database_connection)}" + end + end + + def run_data_migrations + logger.info "Migrating data" + PactBroker::DB.run_data_migrations(configuration.database_connection) + end + def load_configuration_from_database configuration.load_from_database! end diff --git a/pact_broker.gemspec b/pact_broker.gemspec index bb28101bf..32a714de5 100644 --- a/pact_broker.gemspec +++ b/pact_broker.gemspec @@ -73,4 +73,5 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "anyway_config", "~> 2.1" gem.add_runtime_dependency "request_store", "~> 1.5" gem.add_runtime_dependency "moments", "~> 0.2" + gem.add_runtime_dependency "sequel-pg_advisory_lock", "~>0.1" end From 0220040e93aef146aa5bd467cdcb164a42e28a6a Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Mon, 25 Mar 2024 11:36:19 +1100 Subject: [PATCH 06/14] feat: add lock to clean task --- lib/pact_broker/tasks/clean_task.rb | 93 ++++++++++++++++++++-------- lib/sequel/postgres_advisory_lock.rb | 43 +++++++++++-- script/docker/db-migrate.sh | 3 + 3 files changed, 107 insertions(+), 32 deletions(-) create mode 100755 script/docker/db-migrate.sh diff --git a/lib/pact_broker/tasks/clean_task.rb b/lib/pact_broker/tasks/clean_task.rb index d356a8c5a..4f094369c 100644 --- a/lib/pact_broker/tasks/clean_task.rb +++ b/lib/pact_broker/tasks/clean_task.rb @@ -1,3 +1,8 @@ +# This task is used to clean up old data in a Pact Broker database +# to stop performance issues from slowing down responses when there is +# too much data. +# See https://docs.pact.io/pact_broker/administration/maintenance + module PactBroker module DB class CleanTask < ::Rake::TaskLib @@ -7,11 +12,13 @@ class CleanTask < ::Rake::TaskLib attr_accessor :version_deletion_limit attr_accessor :logger attr_accessor :dry_run + attr_accessor :use_lock # allow disabling of postgres lock if it is causing problems def initialize &block require "pact_broker/db/clean_incremental" @version_deletion_limit = 1000 @dry_run = false + @use_lock = true @keep_version_selectors = PactBroker::DB::CleanIncremental::DEFAULT_KEEP_SELECTORS rake_task(&block) end @@ -28,42 +35,76 @@ def rake_task &block namespace :db do desc "Clean unnecessary pacts and verifications from database" task :clean do | _t, _args | - instance_eval(&block) - require "pact_broker/db/clean_incremental" - require "pact_broker/error" - require "yaml" - require "benchmark" + with_lock do + perform_clean + end + end + end + end + end + + def perform_clean + require "pact_broker/db/clean_incremental" + require "pact_broker/error" + require "yaml" + require "benchmark" - raise PactBroker::Error.new("You must specify the version_deletion_limit") unless version_deletion_limit + raise PactBroker::Error.new("You must specify the version_deletion_limit") unless version_deletion_limit - prefix = dry_run ? "[DRY RUN] " : "" + if keep_version_selectors.nil? || keep_version_selectors.empty? + raise PactBroker::Error.new("You must specify which versions to keep") + else + add_defaults_to_keep_selectors + output "Deleting oldest #{version_deletion_limit} versions, keeping versions that match the configured selectors", keep_version_selectors.collect(&:to_hash) + end - if keep_version_selectors.nil? || keep_version_selectors.empty? - raise PactBroker::Error.new("You must specify which versions to keep") - else - add_defaults_to_keep_selectors - output "#{prefix}Deleting oldest #{version_deletion_limit} versions, keeping versions that match the configured selectors", keep_version_selectors.collect(&:to_hash) - end + start_time = Time.now + results = PactBroker::DB::CleanIncremental.call(database_connection, + keep: keep_version_selectors, + limit: version_deletion_limit, + logger: logger, + dry_run: dry_run + ) + end_time = Time.now + elapsed_seconds = (end_time - start_time).to_i + output "Results (#{elapsed_seconds} seconds)", results + end - start_time = Time.now - results = PactBroker::DB::CleanIncremental.call(database_connection, - keep: keep_version_selectors, - limit: version_deletion_limit, - logger: logger, - dry_run: dry_run - ) - end_time = Time.now - elapsed_seconds = (end_time - start_time).to_i - output "Results (#{elapsed_seconds} seconds)", results - end + # Use a Postgres advisory lock to ensure that only one clean can run at a time. + # This allows a cron schedule to be used on the Pact Broker Docker image when deployed + # on a multi-instance architecture, without all the instances stepping on each other's toes. + # + # Any tasks that attempt to run while a clean job is running will skip the clean + # and exit with a message and a success code. + # + # To test that the lock works, run: + # script/docker/db-start.sh + # script/docker/db-migrate.sh + # for i in {0..3}; do PACT_BROKER_TEST_DATABASE_URL=postgres://postgres:postgres@localhost/postgres bundle exec rake pact_broker:db:clean &; done; + # + # There will be 3 messages saying "Clean was not performed" and output from one thread showing the clean is being done. + def with_lock + if use_lock + require "sequel/postgres_advisory_lock" + + lock = Sequel::PostgresAdvisoryLock.new(database_connection, :clean, :pg_try_advisory_lock) + results = lock.with_lock do + yield + end + + if !lock.lock_obtained? + output("Clean was not performed as a clean is already in progress. Exiting.") end + else + yield end end - def output string, payload = {} - logger ? logger.info(string, payload) : puts("#{string} #{payload.to_json}") + def output(string, payload = {}) + prefix = dry_run ? "[DRY RUN] " : "" + logger ? logger.info("#{prefix}#{string}") : puts("#{prefix}#{string} #{payload.to_json}") end def add_defaults_to_keep_selectors diff --git a/lib/sequel/postgres_advisory_lock.rb b/lib/sequel/postgres_advisory_lock.rb index 69d3cbb81..3520dd417 100644 --- a/lib/sequel/postgres_advisory_lock.rb +++ b/lib/sequel/postgres_advisory_lock.rb @@ -1,25 +1,41 @@ +require "pact_broker/logging" + +# Docs on lock types from From https://www.postgresql.org/docs/9.1/functions-admin.html +# +# pg_advisory_lock locks an application-defined resource. +# If another session already holds a lock on the same resource identifier, this function will wait until the +# resource becomes available. +# The lock is exclusive. +# Multiple lock requests stack, so that if the same resource is locked three times it must then be unlocked three +# times to be released for other sessions' use. + +# pg_advisory_lock_shared works the same as pg_advisory_lock, except the lock can be shared with other sessions +# requesting shared locks. Only would-be exclusive lockers are locked out. + +# pg_try_advisory_lock is similar to pg_advisory_lock, except the function will not wait for the lock to become available. +# It will either obtain the lock immediately and return true, or return false if the lock cannot be acquired immediately. + module Sequel class PostgresAdvisoryLock + include PactBroker::Logging + def initialize(database_connection, name, type = :pg_try_advisory_lock) @database_connection = database_connection @name = name @type = type @lock_obtained = false - if postgres? - @database_connection.extension :pg_advisory_lock - unless @database_connection.registered_advisory_locks.key?(@name) - @database_connection.register_advisory_lock(@name, @type) - end - end + register_advisory_lock if postgres? end def with_lock if postgres? @database_connection.with_advisory_lock(@name) do + logger.debug("Lock #{@name} obtained") @lock_obtained = true yield end else + logger.debug("Executing without lock as this is not a postgres database") @lock_obtained = true yield end @@ -29,8 +45,23 @@ def lock_obtained? @lock_obtained end + private + def postgres? @database_connection.adapter_scheme.to_s =~ /postgres/ end + + + def register_advisory_lock + @database_connection.extension :pg_advisory_lock + unless @database_connection.registered_advisory_locks.key?(@name) + logger.debug("Registering postgres lock of type #{@type} with name #{@name}") + begin + @database_connection.register_advisory_lock(@name, @type) + rescue Sequel::Error => e + logger.info(e.message) + end + end + end end end diff --git a/script/docker/db-migrate.sh b/script/docker/db-migrate.sh new file mode 100755 index 000000000..5be8db83e --- /dev/null +++ b/script/docker/db-migrate.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +PACT_BROKER_TEST_DATABASE_URL=postgres://postgres:postgres@localhost/postgres bundle exec rake pact_broker:db:migrate \ No newline at end of file From 1d85a52b55eb21fe163e46fb1c8b4a2e79730e6b Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Mon, 25 Mar 2024 11:57:01 +1100 Subject: [PATCH 07/14] chore: copy in pg advisory lock extension so that the error can be customised when duplicate lock happens because multiple threads --- lib/sequel/extensions/pg_advisory_lock.rb | 96 +++++++++++++++++++++++ lib/sequel/postgres_advisory_lock.rb | 15 +++- pact_broker.gemspec | 1 - 3 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 lib/sequel/extensions/pg_advisory_lock.rb diff --git a/lib/sequel/extensions/pg_advisory_lock.rb b/lib/sequel/extensions/pg_advisory_lock.rb new file mode 100644 index 000000000..0a3e645e0 --- /dev/null +++ b/lib/sequel/extensions/pg_advisory_lock.rb @@ -0,0 +1,96 @@ +# Copied with thanks from https://github.com/yuryroot/sequel-pg_advisory_lock +# The reason this is copy/pasted is that I wanted to customise the error raised +# when the lock was already registered, and it didn't seem worth going through a +# full PR process and getting the gem republished. + +require 'sequel' +require 'zlib' + +module Sequel + module Postgres + module PgAdvisoryLock + + SESSION_LEVEL_LOCKS = [ + :pg_advisory_lock, + :pg_try_advisory_lock + ].freeze + + TRANSACTION_LEVEL_LOCKS = [ + :pg_advisory_xact_lock, + :pg_try_advisory_xact_lock + ].freeze + + LOCK_FUNCTIONS = (SESSION_LEVEL_LOCKS + TRANSACTION_LEVEL_LOCKS).freeze + + DEFAULT_LOCK_FUNCTION = :pg_advisory_lock + UNLOCK_FUNCTION = :pg_advisory_unlock + + class LockAlreadyRegistered < Sequel::Error; end + + def registered_advisory_locks + @registered_advisory_locks ||= {} + end + + def with_advisory_lock(name, id = nil, &block) + options = registered_advisory_locks.fetch(name.to_sym) + + lock_key = options.fetch(:key) + function_params = [lock_key, id].compact + + lock_function = options.fetch(:lock_function) + transaction_level_lock = TRANSACTION_LEVEL_LOCKS.include?(lock_function) + + if transaction_level_lock + # TODO: It's allowed to specify additional options (in particular, :server) + # while opening database transaction. + # That's why this check must be smarter. + unless in_transaction? + raise Error, "Transaction must be manually opened before using transaction level lock '#{lock_function}'" + end + + if get(Sequel.function(lock_function, *function_params)) + yield + end + else + synchronize do + if get(Sequel.function(lock_function, *function_params)) + begin + result = yield + ensure + get(Sequel.function(UNLOCK_FUNCTION, *function_params)) + result + end + end + end + end + end + + def register_advisory_lock(name, lock_function = DEFAULT_LOCK_FUNCTION) + name = name.to_sym + + if registered_advisory_locks.key?(name) + raise LockAlreadyRegistered, "Lock with name :#{name} is already registered" + end + + key = advisory_lock_key_for(name) + if registered_advisory_locks.values.any? { |opts| opts.fetch(:key) == key } + raise Error, "Lock key #{key} is already taken" + end + + function = lock_function.to_sym + unless LOCK_FUNCTIONS.include?(function) + raise Error, "Invalid lock function :#{function}" + end + + registered_advisory_locks[name] = { key: key, lock_function: function } + end + + def advisory_lock_key_for(lock_name) + Zlib.crc32(lock_name.to_s) % 2 ** 31 + end + + end + end + + Database.register_extension(:pg_advisory_lock, Postgres::PgAdvisoryLock) +end diff --git a/lib/sequel/postgres_advisory_lock.rb b/lib/sequel/postgres_advisory_lock.rb index 3520dd417..4d75ac1c5 100644 --- a/lib/sequel/postgres_advisory_lock.rb +++ b/lib/sequel/postgres_advisory_lock.rb @@ -15,6 +15,19 @@ # pg_try_advisory_lock is similar to pg_advisory_lock, except the function will not wait for the lock to become available. # It will either obtain the lock immediately and return true, or return false if the lock cannot be acquired immediately. +# There are race conditions with the in-memory lock registry that may cause problems when running multiple threads. +# To handle this, the code catches and ignores the error Sequel::Postgres::PgAdvisoryLock::LockAlreadyRegistered + +# To test the register_advisory_lock method +# +# threads = [] +# 4.times { +# threads << Thread.new { Sequel::PostgresAdvisoryLock.new(connection, :clean) } +# } +# threads.each(&:join) +# +# Should see the output "lock already registered" +# module Sequel class PostgresAdvisoryLock include PactBroker::Logging @@ -58,7 +71,7 @@ def register_advisory_lock logger.debug("Registering postgres lock of type #{@type} with name #{@name}") begin @database_connection.register_advisory_lock(@name, @type) - rescue Sequel::Error => e + rescue Sequel::Postgres::PgAdvisoryLock::LockAlreadyRegistered => e logger.info(e.message) end end diff --git a/pact_broker.gemspec b/pact_broker.gemspec index 32a714de5..bb28101bf 100644 --- a/pact_broker.gemspec +++ b/pact_broker.gemspec @@ -73,5 +73,4 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "anyway_config", "~> 2.1" gem.add_runtime_dependency "request_store", "~> 1.5" gem.add_runtime_dependency "moments", "~> 0.2" - gem.add_runtime_dependency "sequel-pg_advisory_lock", "~>0.1" end From b34dc6d2bab638cb5eaeb50b9b0fa4557ee50ac1 Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Tue, 26 Mar 2024 11:38:04 +1100 Subject: [PATCH 08/14] chore: copy tests for pg_advisory_lock --- lib/sequel/extensions/pg_advisory_lock.rb | 22 ++-- .../lib/sequel/extensions/lock_names_keys.yml | 100 ++++++++++++++++++ .../extensions/register_advisory_lock_test.rb | 89 ++++++++++++++++ 3 files changed, 202 insertions(+), 9 deletions(-) create mode 100644 spec/lib/sequel/extensions/lock_names_keys.yml create mode 100644 spec/lib/sequel/extensions/register_advisory_lock_test.rb diff --git a/lib/sequel/extensions/pg_advisory_lock.rb b/lib/sequel/extensions/pg_advisory_lock.rb index 0a3e645e0..97cac1da8 100644 --- a/lib/sequel/extensions/pg_advisory_lock.rb +++ b/lib/sequel/extensions/pg_advisory_lock.rb @@ -1,10 +1,12 @@ -# Copied with thanks from https://github.com/yuryroot/sequel-pg_advisory_lock -# The reason this is copy/pasted is that I wanted to customise the error raised -# when the lock was already registered, and it didn't seem worth going through a -# full PR process and getting the gem republished. +# Copied with thanks from https://github.com/yuryroot/sequel-pg_advisory_lock/blob/d7509aa/lib/sequel/extensions/pg_advisory_lock.rb +# The reason this is copy/pasted and modified is that I wanted to allow exact duplicate +# locks to be registered because different threads running the same code +# should not cause a Sequel::Error to be raised. +# Also, I wanted it to use Concurrent::Hash for multi-threaded environments. require 'sequel' require 'zlib' +require 'concurrent/hash' module Sequel module Postgres @@ -28,7 +30,7 @@ module PgAdvisoryLock class LockAlreadyRegistered < Sequel::Error; end def registered_advisory_locks - @registered_advisory_locks ||= {} + @registered_advisory_locks ||= Concurrent::Hash.new end def with_advisory_lock(name, id = nil, &block) @@ -65,15 +67,18 @@ def with_advisory_lock(name, id = nil, &block) end end + # Beth: not sure what extra value this registration provides + # It's just turning the name into a number, and making sure the name/number is unique. def register_advisory_lock(name, lock_function = DEFAULT_LOCK_FUNCTION) name = name.to_sym - if registered_advisory_locks.key?(name) - raise LockAlreadyRegistered, "Lock with name :#{name} is already registered" + if registered_advisory_locks.key?(name) && registered_advisory_locks[name][:lock_function] != lock_function + raise LockAlreadyRegistered, "Lock with name :#{name} is already registered with a different lock function (#{registered_advisory_locks[name][:lock_function]})" end key = advisory_lock_key_for(name) - if registered_advisory_locks.values.any? { |opts| opts.fetch(:key) == key } + name_for_key = registered_advisory_locks.keys.find { |name| registered_advisory_locks[name].fetch(:key) == key } + if name_for_key && name_for_key != name raise Error, "Lock key #{key} is already taken" end @@ -88,7 +93,6 @@ def register_advisory_lock(name, lock_function = DEFAULT_LOCK_FUNCTION) def advisory_lock_key_for(lock_name) Zlib.crc32(lock_name.to_s) % 2 ** 31 end - end end diff --git a/spec/lib/sequel/extensions/lock_names_keys.yml b/spec/lib/sequel/extensions/lock_names_keys.yml new file mode 100644 index 000000000..3893a6d13 --- /dev/null +++ b/spec/lib/sequel/extensions/lock_names_keys.yml @@ -0,0 +1,100 @@ +--- +5b25e9d56cf531395d7eb91474acf01c600235d24090ecbf107a: 922798582 +9fb20fc10a71bc70f7556ee2cb2d9fd73a14d4ea21533cb57f4e6a84d0b7a4d247f7c64fa09aeb750ef5fe51bf9d: 897625924 +45780a5ee58571f5a2feac03913dd4597ed5bcbafb357231f1e6bc9e40b02afc45f219200f8ff6bbcb: 1473933586 +54afaac7d7c7ecc0734c301c11a26adc7ce2bc053024: 55786877 +41b3170b207e66b1eae1a84a03db26a45ffad81c013c81c388c1ac0a: 483987601 +316c92: 1995393960 +01213c6faa1234eaef8d5ee0b927ddd8605a8de4f4d720c444e096722e6c8767595ef3820526: 1182101158 +d670c662978e4c5ece07962b93053352b169f3d4b4545b53d0857ab8cb4b2d2496e4f4536d5c0c7108e72de62b3b26bc: 1119328736 +287ec6811624273828d5cbdc3ce6: 365621557 +3ab555ecc1b687ea68b7966637cb8b6aae6ea087f573f1f54c63f9b6191cf5985794ef1fc2: 1637375338 +4922ba19c153457e4077e12b6f42c049e9fe2888e4c263357a9f6e43: 1039229035 +8ba8cf151a952b8e86acc8397bcbc6f6c0f22d7b2d09de2977c6de095e7d: 76296185 +9a2aad22a067750f65336ad874f28f6b8fe6660fffb9abee57a52ea856e9db84346363401d3082e3cc38d1: 2012818523 +1df37db5ca4759a41cab515d4f8c7be4864c76b647fa60a8397ae72bd2: 511628245 +448501e579e9cf595961317530eecd55b32559d1b9c889b06440e9903cf5c0b6e6f648aa6304e451: 1570896729 +e595ee62d9e5253dc404615149ef5ce7: 1194816858 +038aca8adbfa2379e2785daf82adb536234acd9606ca4b3c3a: 732478070 +6eb1: 1007515829 +73653d90fc4a86080ecc3aa7595dccbf29c642cfebadb5e3402f113b9237b48027079324f4c1ae: 660466134 +3d500f41c70beef0184fc70ef0f6c462c45071bc2f68fdce97e607418d55813db7ce926194776c5be6: 220327056 +d51387ab1a5b20d1160b063f073dc2853faa7c05da11e5bd0f6c26b1210c718bb7f1f6cfd4a55d6dc8ff: 182882809 +9a13a82a50433497728dc2edb2: 666506043 +afd607619128a8a18471a6c64456eeb947187c8d8c9d831d319c22e3ca91: 1585000330 +88758d3f4db23cefae7e9c1ed1d1da: 353123979 +589ef5e96046a05a08f073e8715d94d932a2343b8c: 241791474 +a15059545c772be2fa34123b9d8a91ff9065232eb23163f5e3b406f5a1617024d36efe06b5a6735d71c5: 501631426 +12fa2ef6582fe8c237ad87124f3c5230d2cd: 182175882 +56328a1fa9ff3a7ffbbeca7bd6b2b3128d5bc87bc9cf: 78183177 +4f72: 1250424524 +78b5b7520420abd83de7d4d8a04b1d778b4879cffbb9037d9a8352c9e8: 1441815315 +6136fb4240624b722118a06eece42a5d9a2c: 1751735639 +3091db355a8bf8ed0a5e: 456038127 +f96dcc2843cafdd4a34c780e1a01338ecda8fcd6655a3d796f243194a5dc63da83541971f1cd5eb7ba35: 212906644 +c0e650b6139390b0c145b71fe8ee: 1717118197 +ccb8845ac4b34d23be99: 414287445 +'49': 636905728 +303ff5f54e: 539528939 +478b8f89622f42d3e52337ef4466202ac98a3f1376924075db75ffcb752655d5e8420468022d2ac1: 40606646 +f599f96da671fc62cefa106dfeffa25e2d68f48e6027087ba6870a9d0393970b3920be: 1567333467 +3cb95e3bf12bde6957569d0dca890a22cd5e4afa94f2d15b1e61e01d381508fc0537fcb2eb: 1862269908 +70da2cacbdc197ce12be3d3eb480fe01a13ebcff83d26ce98e49d2: 128376115 +5c253269f8952c0e2676: 1782203278 +0bcdd137e90916568b: 2039072251 +af0872c2f276902384779c81cf3febfdc72b2566e42aba3aba2738ec8102d693b7032ebc239a8b9471602fa50f1d488b1979: 459687685 +a9f1f80b4b9faac29173d8beef5220e4ad9993f05be5f950245e4b0035a98f58d830defce2dc: 1696241422 +d2ee666571ece020bd8137c39580b1abc9fb4a89991ce0aa7822b5d960d3e8ca09: 91351839 +3fdda8adf582eaf51ea40b5b9122515502612470c2bb66a9c5558201e2c84694a8cbe092fcde6a6dd942dc5eb8ede08b: 656571790 +ae: 15195598 +269741a9d8b944cb815bc9fd6db15d3f5106b7811541043a7448a32b7d8f17: 974353071 +603770f07d4571f83f2974aae0be42041c18e2be7ce0138df8d0: 1366051146 +e07d82b7f9cd8c5fdc: 874617480 +8d83a1f774bcd95a8aae: 1687774301 +ea5f6d68f75dbc5fdddffc2752f33ce4cfdcc5aeb5ce03851e61a61c6724d55ff4dc66442a: 1377712884 +93b0bfedc1910576f173b5646490474d2042a2387f6279416e7faa7df15aa56ea0da4cd8416a: 154691814 +bedea59011b243b84bda7466c37e88df8b82: 694569716 +7c79b01e18f95a30cd57b9fc: 683762260 +5ab349cb2b167c9760365e0aac391c5496d76ae7: 1689745541 +438328ca1f61a489cd6e: 1219960715 +969b44ec30748b198eda45256b3de53cff82485e5bc3f107c23b009440fe2fa40d1e8bd7aa: 1151588521 +e374274d01c6d3a83cc4: 1515901746 +ea9533b35a1c83ba4e259c5a56e36b8a32c632408210ddef474e: 245084129 +6a6b1b7b0f427cfbc2afbd9b7ec3: 1838557419 +7afe477c5fbc7d0ef167bec6b0309fb2996f172e06b176: 210109733 +189ed9a4acf0d419045b67dec4: 399970956 +aa9f60bd2ff23370628570a6f9cb7a57e06509920e3cbf8a: 1050621436 +daf9a25a8fe0775b99e2e394e03e95da53dc59403edf08ecbec72c4491411fabdb5f4410049c46f919fe48ad9dc6d6: 935832483 +7299937593973d8b988198116f77b0c41f8b80009d1755604925783aee66bdc5ad0b6a81dcc4c7763819dce5201c: 38658330 +254dd86414a411e4dd5bd74a21dfbebf397fdcfa56c802: 56596741 +42a11ea77233f9115bbd5e2b12a7fa171fdb05fbe511d67d94000e9724e17608cc798a1afe: 727298738 +2f79708c54e9e2f16dd82f57789c: 1608939254 +1fdafdbf1129dbe52b4e6570e120cc2dac025b42c9a486bd6b: 1938076612 +6b532b8d568c44711b7f918c2e360781600142de1b1164a32297d52fd7ff65e985360348b90201ab: 347982673 +a65245aeee7900178b1bbc3cc87191e72db441cfd5b49f400763982d906e32ab92b4de417f: 1991069376 +d839d6da17be0b45a1734153a1b5f8e5d02ea0c28ff299234526b0ccb15c72f5775dff8dd7: 1487367662 +2156419fc3adc5178b6d679619f08da1a90fb8f4500c5be52cf840428f1229884032f3fcd242e58e: 548015352 +efc22626e5621243: 1359916413 +04ed6ee27eb54a7bb15edb1f45fdc7035b1bbcaabeac8213d5bc9dacbc0d8c70: 1585988197 +'7199': 948999441 +0a0472f5087f23a623b5: 454820883 +6e4a104a6eb44bc3a6bf1107c06058ac9ca9fc7288e73cfad82b98d66feafc06df182ac732a1d11f9feec8: 1024970306 +42e22b983db5ec1fcd3a945d2fae53: 1649388059 +749ae1c0355da123926654c40885aab25eb5: 1046362064 +1dc4a8c65d67aa3863d6f2: 1072437948 +4e: 1194030303 +bd7fc32f3e1bdcefed3ebd8359b2eea1fc8b25486609aa86d87abe0e37aaa0: 2116128184 +c5aadfa7468e6a82f99c62876a1047bfebba9d36b4d4f97aff51b2540db8: 1415072913 +c450: 794638503 +8796f9ddabaf101ffb34542bd424bcc8a2b47822bc4b2010cc5e8799d19e3e7e438618: 1744306497 +7ce00be84fd51986bfb7faec522c7eb2c3e1a4e084d5: 1088708292 +89a600cf778d1ad0713e28cc660c6756507b87a4e6f347d430f333623f: 695948817 +c50b8ae10dcc2e29: 1193146886 +424793cb6fc0c9cb653d2bdd9248: 1395788399 +c979e881144c0c33b9: 1933771153 +185d70e249be25267445fb3ba544898e277384d09feaf021e89b44e1a01ba6eb743c: 1568080655 +3cc0192772e43af59ea642542a5cf830584ed6efb2122b66: 28535105 +6f8db868686de7c175: 277293610 +30985c01c3a94f39f4616be2a013a95b94b5b4f0a302f1595b8f589cce6da8: 1741848048 +d907ab072c35fd33: 2141142045 +7e8e3d3587c4adaa1c4bcb5b78b7c5c337c4b4492f4a071283fd649c47cd7e: 1010112831 diff --git a/spec/lib/sequel/extensions/register_advisory_lock_test.rb b/spec/lib/sequel/extensions/register_advisory_lock_test.rb new file mode 100644 index 000000000..421be02dc --- /dev/null +++ b/spec/lib/sequel/extensions/register_advisory_lock_test.rb @@ -0,0 +1,89 @@ +require "sequel/extensions/pg_advisory_lock" + +describe Sequel::Postgres::PgAdvisoryLock do + subject { Sequel::Model.db } + + describe '#register_advisory_lock' do + let(:supported_lock_functions) do + [ + :pg_advisory_lock, + :pg_try_advisory_lock, + :pg_advisory_xact_lock, + :pg_try_advisory_xact_lock + ] + end + + let(:default_lock_function) { :pg_advisory_lock } + + before :all do + Sequel::Model.db.extension(:pg_advisory_lock) + end + + before do + subject.registered_advisory_locks.clear + end + + it 'base check' do + lock_name = :test_lock + + expect(subject.registered_advisory_locks[lock_name]).to be nil + subject.register_advisory_lock(lock_name) + expect(default_lock_function).to eq subject.registered_advisory_locks[lock_name].fetch(:lock_function) + end + + it 'should register locks for all supported PostgreSQL functions' do + supported_lock_functions.each do |lock_function| + lock_name = "#{lock_function}_test".to_sym + + expect(subject.registered_advisory_locks[lock_name]).to be nil + subject.register_advisory_lock(lock_name, lock_function) + expect(lock_function).to eq subject.registered_advisory_locks[lock_name].fetch(:lock_function) + end + end + + it 'should prevent specifying not supported PostgreSQL function as lock type' do + lock_name = :not_supported_lock_function_test + lock_function = :not_supported_lock_function + + expect { subject.register_advisory_lock(lock_name, lock_function) }.to raise_error(Sequel::Error, /Invalid lock function/) + end + + it 'should prevent registering multiple locks with same name and different functions' do + lock_name = :multiple_locks_with_same_name_test + subject.register_advisory_lock(lock_name, supported_lock_functions[0]) + + expect { subject.register_advisory_lock(lock_name, supported_lock_functions[1]) }.to raise_error(Sequel::Error, /Lock with name .+ is already registered/) + end + + it 'should allow registering multiple locks with same name and same functions' do + lock_name = :multiple_locks_with_same_name_test + subject.register_advisory_lock(lock_name, supported_lock_functions[0]) + + expect { subject.register_advisory_lock(lock_name, supported_lock_functions[0]) }.to_not raise_error + end + + it 'registered locks must have different lock keys' do + quantity = 100 + quantity.times do |index| + lock_name = "test_lock_#{index}".to_sym + subject.register_advisory_lock(lock_name) + end + + expect(quantity).to eq subject.registered_advisory_locks.size + all_keys = subject.registered_advisory_locks.values.map { |v| v.fetch(:key) } + expect(all_keys.size).to eq all_keys.uniq.size + end + + it 'mapping between lock name and lock key must be constant' do + expect(subject.registered_advisory_locks).to be_empty + + lock_names_keys_mapping = YAML.load_file(File.join(File.dirname(__FILE__), 'lock_names_keys.yml')) + + lock_names_keys_mapping.each do |lock_name, valid_lock_key| + lock_name = lock_name.to_sym + subject.register_advisory_lock(lock_name) + expect(valid_lock_key).to eq subject.registered_advisory_locks[lock_name].fetch(:key) + end + end + end +end From 716189ae55d7e52ccf20b5eac658ac16c8dbe613 Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Tue, 26 Mar 2024 11:40:38 +1100 Subject: [PATCH 09/14] docs: comments --- lib/sequel/extensions/pg_advisory_lock.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/sequel/extensions/pg_advisory_lock.rb b/lib/sequel/extensions/pg_advisory_lock.rb index 97cac1da8..f1ff37bc4 100644 --- a/lib/sequel/extensions/pg_advisory_lock.rb +++ b/lib/sequel/extensions/pg_advisory_lock.rb @@ -67,8 +67,9 @@ def with_advisory_lock(name, id = nil, &block) end end - # Beth: not sure what extra value this registration provides - # It's just turning the name into a number, and making sure the name/number is unique. + # Beth: not sure how much extra value this registration provides. + # It turns the name into a number, and makes sure the name/number is unique, + # and that you don't try and use a different lock function with the same name. def register_advisory_lock(name, lock_function = DEFAULT_LOCK_FUNCTION) name = name.to_sym From 0c840d875c68cb0ca00198bddb6393f5c4e6dca7 Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Tue, 26 Mar 2024 11:57:00 +1100 Subject: [PATCH 10/14] refactor: move lock file into pact broker module --- lib/pact_broker/app.rb | 4 +- lib/pact_broker/db/advisory_lock.rb | 74 +++++++++++++++++++++++++ lib/pact_broker/tasks/clean_task.rb | 4 +- lib/sequel/postgres_advisory_lock.rb | 80 ---------------------------- 4 files changed, 78 insertions(+), 84 deletions(-) create mode 100644 lib/pact_broker/db/advisory_lock.rb delete mode 100644 lib/sequel/postgres_advisory_lock.rb diff --git a/lib/pact_broker/app.rb b/lib/pact_broker/app.rb index b3e241401..2f6be0006 100644 --- a/lib/pact_broker/app.rb +++ b/lib/pact_broker/app.rb @@ -27,7 +27,7 @@ require "pact_broker/api/authorization/resource_access_policy" require "pact_broker/api/middleware/http_debug_logs" require "pact_broker/application_context" -require "sequel/postgres_advisory_lock" +require "pact_broker/db/advisory_lock" module PactBroker @@ -102,7 +102,7 @@ def post_configure def prepare_database logger.info "Database schema version is #{PactBroker::DB.version(configuration.database_connection)}" - lock = Sequel::PostgresAdvisoryLock.new(configuration.database_connection, :migrate, :pg_advisory_lock) + lock = PactBroker::DB::AdvisoryLock.new(configuration.database_connection, :migrate, :pg_advisory_lock) if configuration.auto_migrate_db lock.with_lock do ensure_all_database_migrations_are_applied diff --git a/lib/pact_broker/db/advisory_lock.rb b/lib/pact_broker/db/advisory_lock.rb new file mode 100644 index 000000000..921d6d4a3 --- /dev/null +++ b/lib/pact_broker/db/advisory_lock.rb @@ -0,0 +1,74 @@ +require "pact_broker/logging" + +# Uses a Postgres advisory lock to ensure that a given block of code can only have ONE +# thread in excution at a time against a given database. +# When the database is not Postgres, the block will yield without any locks, allowing +# this class to be used safely with other database types, but without the locking +# functionality. +# +# This is a wrapper around the actual implementation code in the Sequel extension from https://github.com/yuryroot/sequel-pg_advisory_lock +# which was copied into this codebase and modified for usage in this codebase. +# +# Docs on lock types from From https://www.postgresql.org/docs/9.1/functions-admin.html +# +# pg_advisory_lock locks an application-defined resource. +# If another session already holds a lock on the same resource identifier, this function will wait until the +# resource becomes available. +# The lock is exclusive. +# Multiple lock requests stack, so that if the same resource is locked three times it must then be unlocked three +# times to be released for other sessions' use. + +# pg_advisory_lock_shared works the same as pg_advisory_lock, except the lock can be shared with other sessions +# requesting shared locks. Only would-be exclusive lockers are locked out. + +# pg_try_advisory_lock is similar to pg_advisory_lock, except the function will not wait for the lock to become available. +# It will either obtain the lock immediately and return true, or return false if the lock cannot be acquired immediately. + +# There are race conditions with the in-memory lock registry that may cause problems when running multiple threads. +# To handle this, the code catches and ignores the error Sequel::Postgres::PgAdvisoryLock::LockAlreadyRegistered + +module PactBroker + module DB + class AdvisoryLock + include PactBroker::Logging + + def initialize(database_connection, name, type = :pg_try_advisory_lock) + @database_connection = database_connection + @name = name + @type = type + @lock_obtained = false + register_advisory_lock if postgres? + end + + def with_lock + if postgres? + @database_connection.with_advisory_lock(@name) do + logger.debug("Lock #{@name} obtained") + @lock_obtained = true + yield + end + else + logger.warn("Executing block without lock as this is not a Postgres database") + @lock_obtained = true + yield + end + end + + def lock_obtained? + @lock_obtained + end + + private + + def postgres? + @database_connection.adapter_scheme.to_s =~ /postgres/ + end + + + def register_advisory_lock + @database_connection.extension :pg_advisory_lock + @database_connection.register_advisory_lock(@name, @type) + end + end + end +end diff --git a/lib/pact_broker/tasks/clean_task.rb b/lib/pact_broker/tasks/clean_task.rb index 4f094369c..809793440 100644 --- a/lib/pact_broker/tasks/clean_task.rb +++ b/lib/pact_broker/tasks/clean_task.rb @@ -87,9 +87,9 @@ def perform_clean # There will be 3 messages saying "Clean was not performed" and output from one thread showing the clean is being done. def with_lock if use_lock - require "sequel/postgres_advisory_lock" + require "pact_broker/db/advisory_lock" - lock = Sequel::PostgresAdvisoryLock.new(database_connection, :clean, :pg_try_advisory_lock) + lock = PactBroker::DB::AdvisoryLock.new(database_connection, :clean, :pg_try_advisory_lock) results = lock.with_lock do yield end diff --git a/lib/sequel/postgres_advisory_lock.rb b/lib/sequel/postgres_advisory_lock.rb deleted file mode 100644 index 4d75ac1c5..000000000 --- a/lib/sequel/postgres_advisory_lock.rb +++ /dev/null @@ -1,80 +0,0 @@ -require "pact_broker/logging" - -# Docs on lock types from From https://www.postgresql.org/docs/9.1/functions-admin.html -# -# pg_advisory_lock locks an application-defined resource. -# If another session already holds a lock on the same resource identifier, this function will wait until the -# resource becomes available. -# The lock is exclusive. -# Multiple lock requests stack, so that if the same resource is locked three times it must then be unlocked three -# times to be released for other sessions' use. - -# pg_advisory_lock_shared works the same as pg_advisory_lock, except the lock can be shared with other sessions -# requesting shared locks. Only would-be exclusive lockers are locked out. - -# pg_try_advisory_lock is similar to pg_advisory_lock, except the function will not wait for the lock to become available. -# It will either obtain the lock immediately and return true, or return false if the lock cannot be acquired immediately. - -# There are race conditions with the in-memory lock registry that may cause problems when running multiple threads. -# To handle this, the code catches and ignores the error Sequel::Postgres::PgAdvisoryLock::LockAlreadyRegistered - -# To test the register_advisory_lock method -# -# threads = [] -# 4.times { -# threads << Thread.new { Sequel::PostgresAdvisoryLock.new(connection, :clean) } -# } -# threads.each(&:join) -# -# Should see the output "lock already registered" -# -module Sequel - class PostgresAdvisoryLock - include PactBroker::Logging - - def initialize(database_connection, name, type = :pg_try_advisory_lock) - @database_connection = database_connection - @name = name - @type = type - @lock_obtained = false - register_advisory_lock if postgres? - end - - def with_lock - if postgres? - @database_connection.with_advisory_lock(@name) do - logger.debug("Lock #{@name} obtained") - @lock_obtained = true - yield - end - else - logger.debug("Executing without lock as this is not a postgres database") - @lock_obtained = true - yield - end - end - - def lock_obtained? - @lock_obtained - end - - private - - def postgres? - @database_connection.adapter_scheme.to_s =~ /postgres/ - end - - - def register_advisory_lock - @database_connection.extension :pg_advisory_lock - unless @database_connection.registered_advisory_locks.key?(@name) - logger.debug("Registering postgres lock of type #{@type} with name #{@name}") - begin - @database_connection.register_advisory_lock(@name, @type) - rescue Sequel::Postgres::PgAdvisoryLock::LockAlreadyRegistered => e - logger.info(e.message) - end - end - end - end -end From 6b5a9df1e16aacc83563fbb75e1ec4de310a3bbb Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Tue, 26 Mar 2024 12:49:07 +1100 Subject: [PATCH 11/14] docs: update comments --- lib/pact_broker/db/advisory_lock.rb | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/lib/pact_broker/db/advisory_lock.rb b/lib/pact_broker/db/advisory_lock.rb index 921d6d4a3..b0d9e0e19 100644 --- a/lib/pact_broker/db/advisory_lock.rb +++ b/lib/pact_broker/db/advisory_lock.rb @@ -9,23 +9,8 @@ # This is a wrapper around the actual implementation code in the Sequel extension from https://github.com/yuryroot/sequel-pg_advisory_lock # which was copied into this codebase and modified for usage in this codebase. # -# Docs on lock types from From https://www.postgresql.org/docs/9.1/functions-admin.html +# See https://www.postgresql.org/docs/16/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS for docs on lock types # -# pg_advisory_lock locks an application-defined resource. -# If another session already holds a lock on the same resource identifier, this function will wait until the -# resource becomes available. -# The lock is exclusive. -# Multiple lock requests stack, so that if the same resource is locked three times it must then be unlocked three -# times to be released for other sessions' use. - -# pg_advisory_lock_shared works the same as pg_advisory_lock, except the lock can be shared with other sessions -# requesting shared locks. Only would-be exclusive lockers are locked out. - -# pg_try_advisory_lock is similar to pg_advisory_lock, except the function will not wait for the lock to become available. -# It will either obtain the lock immediately and return true, or return false if the lock cannot be acquired immediately. - -# There are race conditions with the in-memory lock registry that may cause problems when running multiple threads. -# To handle this, the code catches and ignores the error Sequel::Postgres::PgAdvisoryLock::LockAlreadyRegistered module PactBroker module DB From 23af86198b3401964039198fcd50797d70b3731b Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Tue, 26 Mar 2024 12:58:25 +1100 Subject: [PATCH 12/14] style: rubocop --- lib/sequel/extensions/pg_advisory_lock.rb | 8 ++++---- .../extensions/register_advisory_lock_test.rb | 18 +++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/sequel/extensions/pg_advisory_lock.rb b/lib/sequel/extensions/pg_advisory_lock.rb index f1ff37bc4..01fc81716 100644 --- a/lib/sequel/extensions/pg_advisory_lock.rb +++ b/lib/sequel/extensions/pg_advisory_lock.rb @@ -4,9 +4,9 @@ # should not cause a Sequel::Error to be raised. # Also, I wanted it to use Concurrent::Hash for multi-threaded environments. -require 'sequel' -require 'zlib' -require 'concurrent/hash' +require "sequel" +require "zlib" +require "concurrent/hash" module Sequel module Postgres @@ -33,7 +33,7 @@ def registered_advisory_locks @registered_advisory_locks ||= Concurrent::Hash.new end - def with_advisory_lock(name, id = nil, &block) + def with_advisory_lock(name, id = nil) options = registered_advisory_locks.fetch(name.to_sym) lock_key = options.fetch(:key) diff --git a/spec/lib/sequel/extensions/register_advisory_lock_test.rb b/spec/lib/sequel/extensions/register_advisory_lock_test.rb index 421be02dc..3115c64be 100644 --- a/spec/lib/sequel/extensions/register_advisory_lock_test.rb +++ b/spec/lib/sequel/extensions/register_advisory_lock_test.rb @@ -3,7 +3,7 @@ describe Sequel::Postgres::PgAdvisoryLock do subject { Sequel::Model.db } - describe '#register_advisory_lock' do + describe "#register_advisory_lock" do let(:supported_lock_functions) do [ :pg_advisory_lock, @@ -23,7 +23,7 @@ subject.registered_advisory_locks.clear end - it 'base check' do + it "base check" do lock_name = :test_lock expect(subject.registered_advisory_locks[lock_name]).to be nil @@ -31,7 +31,7 @@ expect(default_lock_function).to eq subject.registered_advisory_locks[lock_name].fetch(:lock_function) end - it 'should register locks for all supported PostgreSQL functions' do + it "should register locks for all supported PostgreSQL functions" do supported_lock_functions.each do |lock_function| lock_name = "#{lock_function}_test".to_sym @@ -41,28 +41,28 @@ end end - it 'should prevent specifying not supported PostgreSQL function as lock type' do + it "should prevent specifying not supported PostgreSQL function as lock type" do lock_name = :not_supported_lock_function_test lock_function = :not_supported_lock_function expect { subject.register_advisory_lock(lock_name, lock_function) }.to raise_error(Sequel::Error, /Invalid lock function/) end - it 'should prevent registering multiple locks with same name and different functions' do + it "should prevent registering multiple locks with same name and different functions" do lock_name = :multiple_locks_with_same_name_test subject.register_advisory_lock(lock_name, supported_lock_functions[0]) expect { subject.register_advisory_lock(lock_name, supported_lock_functions[1]) }.to raise_error(Sequel::Error, /Lock with name .+ is already registered/) end - it 'should allow registering multiple locks with same name and same functions' do + it "should allow registering multiple locks with same name and same functions" do lock_name = :multiple_locks_with_same_name_test subject.register_advisory_lock(lock_name, supported_lock_functions[0]) expect { subject.register_advisory_lock(lock_name, supported_lock_functions[0]) }.to_not raise_error end - it 'registered locks must have different lock keys' do + it "registered locks must have different lock keys" do quantity = 100 quantity.times do |index| lock_name = "test_lock_#{index}".to_sym @@ -74,10 +74,10 @@ expect(all_keys.size).to eq all_keys.uniq.size end - it 'mapping between lock name and lock key must be constant' do + it "mapping between lock name and lock key must be constant" do expect(subject.registered_advisory_locks).to be_empty - lock_names_keys_mapping = YAML.load_file(File.join(File.dirname(__FILE__), 'lock_names_keys.yml')) + lock_names_keys_mapping = YAML.load_file(File.join(File.dirname(__FILE__), "lock_names_keys.yml")) lock_names_keys_mapping.each do |lock_name, valid_lock_key| lock_name = lock_name.to_sym From a779f25c03f173cc292cec5ab832572a8b73d239 Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Tue, 26 Mar 2024 13:00:44 +1100 Subject: [PATCH 13/14] style: rubocop --- lib/pact_broker/tasks/clean_task.rb | 1 + lib/sequel/extensions/pg_advisory_lock.rb | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/pact_broker/tasks/clean_task.rb b/lib/pact_broker/tasks/clean_task.rb index 809793440..699c2886c 100644 --- a/lib/pact_broker/tasks/clean_task.rb +++ b/lib/pact_broker/tasks/clean_task.rb @@ -97,6 +97,7 @@ def with_lock if !lock.lock_obtained? output("Clean was not performed as a clean is already in progress. Exiting.") end + results else yield end diff --git a/lib/sequel/extensions/pg_advisory_lock.rb b/lib/sequel/extensions/pg_advisory_lock.rb index 01fc81716..b9a1ea1b1 100644 --- a/lib/sequel/extensions/pg_advisory_lock.rb +++ b/lib/sequel/extensions/pg_advisory_lock.rb @@ -78,7 +78,7 @@ def register_advisory_lock(name, lock_function = DEFAULT_LOCK_FUNCTION) end key = advisory_lock_key_for(name) - name_for_key = registered_advisory_locks.keys.find { |name| registered_advisory_locks[name].fetch(:key) == key } + name_for_key = registered_advisory_locks.keys.find { |n| registered_advisory_locks[n].fetch(:key) == key } if name_for_key && name_for_key != name raise Error, "Lock key #{key} is already taken" end From 2d8700a4c780b33015fa6214f1e392f91d2e613f Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Tue, 26 Mar 2024 13:02:13 +1100 Subject: [PATCH 14/14] style: whitespace --- lib/pact_broker/db/advisory_lock.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/pact_broker/db/advisory_lock.rb b/lib/pact_broker/db/advisory_lock.rb index b0d9e0e19..d59e1a257 100644 --- a/lib/pact_broker/db/advisory_lock.rb +++ b/lib/pact_broker/db/advisory_lock.rb @@ -49,7 +49,6 @@ def postgres? @database_connection.adapter_scheme.to_s =~ /postgres/ end - def register_advisory_lock @database_connection.extension :pg_advisory_lock @database_connection.register_advisory_lock(@name, @type)