Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clean): use postgres advisory locks to ensure only one process can run a clean at a time #672

Merged
merged 14 commits into from
Mar 27, 2024
Merged
33 changes: 24 additions & 9 deletions lib/pact_broker/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
require "pact_broker/api/authorization/resource_access_policy"
require "pact_broker/api/middleware/http_debug_logs"
require "pact_broker/application_context"
require "pact_broker/db/advisory_lock"

module PactBroker

Expand Down Expand Up @@ -101,22 +102,19 @@ def post_configure

def prepare_database
logger.info "Database schema version is #{PactBroker::DB.version(configuration.database_connection)}"
lock = PactBroker::DB::AdvisoryLock.new(configuration.database_connection, :migrate, :pg_advisory_lock)
if configuration.auto_migrate_db
migration_options = { allow_missing_migration_files: configuration.allow_missing_migration_files }
if PactBroker::DB.is_current?(configuration.database_connection, migration_options)
logger.info "Skipping database migrations as the latest migration has already been applied"
else
logger.info "Migrating database schema"
PactBroker::DB.run_migrations configuration.database_connection, migration_options
logger.info "Database schema version is now #{PactBroker::DB.version(configuration.database_connection)}"
lock.with_lock do
ensure_all_database_migrations_are_applied
end
else
logger.info "Skipping database schema migrations as database auto migrate is disabled"
end

if configuration.auto_migrate_db_data
logger.info "Migrating data"
PactBroker::DB.run_data_migrations configuration.database_connection
lock.with_lock do
run_data_migrations
end
else
logger.info "Skipping data migrations"
end
Expand All @@ -125,6 +123,23 @@ def prepare_database
PactBroker::Webhooks::Service.fail_retrying_triggered_webhooks
end

def ensure_all_database_migrations_are_applied
migration_options = { allow_missing_migration_files: configuration.allow_missing_migration_files }

if PactBroker::DB.is_current?(configuration.database_connection, migration_options)
logger.info "Skipping database migrations as the latest migration has already been applied"
else
logger.info "Migrating database schema"
PactBroker::DB.run_migrations(configuration.database_connection, migration_options)
logger.info "Database schema version is now #{PactBroker::DB.version(configuration.database_connection)}"
end
end

def run_data_migrations
logger.info "Migrating data"
PactBroker::DB.run_data_migrations(configuration.database_connection)
end

def load_configuration_from_database
configuration.load_from_database!
end
Expand Down
58 changes: 58 additions & 0 deletions lib/pact_broker/db/advisory_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
require "pact_broker/logging"

# Uses a Postgres advisory lock to ensure that a given block of code can only have ONE
# thread in excution at a time against a given database.
# When the database is not Postgres, the block will yield without any locks, allowing
# this class to be used safely with other database types, but without the locking
# functionality.
#
# This is a wrapper around the actual implementation code in the Sequel extension from https://github.com/yuryroot/sequel-pg_advisory_lock
# which was copied into this codebase and modified for usage in this codebase.
#
# See https://www.postgresql.org/docs/16/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS for docs on lock types
#

module PactBroker
module DB
class AdvisoryLock
include PactBroker::Logging

def initialize(database_connection, name, type = :pg_try_advisory_lock)
@database_connection = database_connection
@name = name
@type = type
@lock_obtained = false
register_advisory_lock if postgres?
end

def with_lock
if postgres?
@database_connection.with_advisory_lock(@name) do
logger.debug("Lock #{@name} obtained")
@lock_obtained = true
yield
end
else
logger.warn("Executing block without lock as this is not a Postgres database")
@lock_obtained = true
yield
end
end

def lock_obtained?
@lock_obtained
end

private

def postgres?
@database_connection.adapter_scheme.to_s =~ /postgres/
end

def register_advisory_lock
@database_connection.extension :pg_advisory_lock
@database_connection.register_advisory_lock(@name, @type)
end
end
end
end
94 changes: 68 additions & 26 deletions lib/pact_broker/tasks/clean_task.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# This task is used to clean up old data in a Pact Broker database
# to stop performance issues from slowing down responses when there is
# too much data.
# See https://docs.pact.io/pact_broker/administration/maintenance

module PactBroker
module DB
class CleanTask < ::Rake::TaskLib
Expand All @@ -7,11 +12,13 @@ class CleanTask < ::Rake::TaskLib
attr_accessor :version_deletion_limit
attr_accessor :logger
attr_accessor :dry_run
attr_accessor :use_lock # allow disabling of postgres lock if it is causing problems

def initialize &block
require "pact_broker/db/clean_incremental"
@version_deletion_limit = 1000
@dry_run = false
@use_lock = true
@keep_version_selectors = PactBroker::DB::CleanIncremental::DEFAULT_KEEP_SELECTORS
rake_task(&block)
end
Expand All @@ -28,42 +35,77 @@ def rake_task &block
namespace :db do
desc "Clean unnecessary pacts and verifications from database"
task :clean do | _t, _args |

instance_eval(&block)

require "pact_broker/db/clean_incremental"
require "pact_broker/error"
require "yaml"
require "benchmark"
with_lock do
perform_clean
end
end
end
end
end

def perform_clean
require "pact_broker/db/clean_incremental"
require "pact_broker/error"
require "yaml"
require "benchmark"

raise PactBroker::Error.new("You must specify the version_deletion_limit") unless version_deletion_limit
raise PactBroker::Error.new("You must specify the version_deletion_limit") unless version_deletion_limit

prefix = dry_run ? "[DRY RUN] " : ""
if keep_version_selectors.nil? || keep_version_selectors.empty?
raise PactBroker::Error.new("You must specify which versions to keep")
else
add_defaults_to_keep_selectors
output "Deleting oldest #{version_deletion_limit} versions, keeping versions that match the configured selectors", keep_version_selectors.collect(&:to_hash)
end

if keep_version_selectors.nil? || keep_version_selectors.empty?
raise PactBroker::Error.new("You must specify which versions to keep")
else
add_defaults_to_keep_selectors
output "#{prefix}Deleting oldest #{version_deletion_limit} versions, keeping versions that match the configured selectors", keep_version_selectors.collect(&:to_hash)
end
start_time = Time.now
results = PactBroker::DB::CleanIncremental.call(database_connection,
keep: keep_version_selectors,
limit: version_deletion_limit,
logger: logger,
dry_run: dry_run
)
end_time = Time.now
elapsed_seconds = (end_time - start_time).to_i
output "Results (#{elapsed_seconds} seconds)", results
end

start_time = Time.now
results = PactBroker::DB::CleanIncremental.call(database_connection,
keep: keep_version_selectors,
limit: version_deletion_limit,
logger: logger,
dry_run: dry_run
)
end_time = Time.now
elapsed_seconds = (end_time - start_time).to_i
output "Results (#{elapsed_seconds} seconds)", results
end
# Use a Postgres advisory lock to ensure that only one clean can run at a time.
# This allows a cron schedule to be used on the Pact Broker Docker image when deployed
# on a multi-instance architecture, without all the instances stepping on each other's toes.
#
# Any tasks that attempt to run while a clean job is running will skip the clean
# and exit with a message and a success code.
#
# To test that the lock works, run:
# script/docker/db-start.sh
# script/docker/db-migrate.sh
# for i in {0..3}; do PACT_BROKER_TEST_DATABASE_URL=postgres://postgres:postgres@localhost/postgres bundle exec rake pact_broker:db:clean &; done;
#
# There will be 3 messages saying "Clean was not performed" and output from one thread showing the clean is being done.
def with_lock
if use_lock
require "pact_broker/db/advisory_lock"

lock = PactBroker::DB::AdvisoryLock.new(database_connection, :clean, :pg_try_advisory_lock)
results = lock.with_lock do
yield
end

if !lock.lock_obtained?
output("Clean was not performed as a clean is already in progress. Exiting.")
end
results
else
yield
end
end

def output string, payload = {}
logger ? logger.info(string, payload) : puts("#{string} #{payload.to_json}")
def output(string, payload = {})
prefix = dry_run ? "[DRY RUN] " : ""
logger ? logger.info("#{prefix}#{string}") : puts("#{prefix}#{string} #{payload.to_json}")
end

def add_defaults_to_keep_selectors
Expand Down
101 changes: 101 additions & 0 deletions lib/sequel/extensions/pg_advisory_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Copied with thanks from https://github.com/yuryroot/sequel-pg_advisory_lock/blob/d7509aa/lib/sequel/extensions/pg_advisory_lock.rb
# The reason this is copy/pasted and modified is that I wanted to allow exact duplicate
# locks to be registered because different threads running the same code
# should not cause a Sequel::Error to be raised.
# Also, I wanted it to use Concurrent::Hash for multi-threaded environments.

require "sequel"
require "zlib"
require "concurrent/hash"

module Sequel
module Postgres
module PgAdvisoryLock

SESSION_LEVEL_LOCKS = [
:pg_advisory_lock,
:pg_try_advisory_lock
].freeze

TRANSACTION_LEVEL_LOCKS = [
:pg_advisory_xact_lock,
:pg_try_advisory_xact_lock
].freeze

LOCK_FUNCTIONS = (SESSION_LEVEL_LOCKS + TRANSACTION_LEVEL_LOCKS).freeze

DEFAULT_LOCK_FUNCTION = :pg_advisory_lock
UNLOCK_FUNCTION = :pg_advisory_unlock

class LockAlreadyRegistered < Sequel::Error; end

def registered_advisory_locks
@registered_advisory_locks ||= Concurrent::Hash.new
end

def with_advisory_lock(name, id = nil)
options = registered_advisory_locks.fetch(name.to_sym)

lock_key = options.fetch(:key)
function_params = [lock_key, id].compact

lock_function = options.fetch(:lock_function)
transaction_level_lock = TRANSACTION_LEVEL_LOCKS.include?(lock_function)

if transaction_level_lock
# TODO: It's allowed to specify additional options (in particular, :server)
# while opening database transaction.
# That's why this check must be smarter.
unless in_transaction?
raise Error, "Transaction must be manually opened before using transaction level lock '#{lock_function}'"
end

if get(Sequel.function(lock_function, *function_params))
yield
end
else
synchronize do
if get(Sequel.function(lock_function, *function_params))
begin
result = yield
ensure
get(Sequel.function(UNLOCK_FUNCTION, *function_params))
result
end
end
end
end
end

# Beth: not sure how much extra value this registration provides.
# It turns the name into a number, and makes sure the name/number is unique,
# and that you don't try and use a different lock function with the same name.
def register_advisory_lock(name, lock_function = DEFAULT_LOCK_FUNCTION)
name = name.to_sym

if registered_advisory_locks.key?(name) && registered_advisory_locks[name][:lock_function] != lock_function
raise LockAlreadyRegistered, "Lock with name :#{name} is already registered with a different lock function (#{registered_advisory_locks[name][:lock_function]})"
end

key = advisory_lock_key_for(name)
name_for_key = registered_advisory_locks.keys.find { |n| registered_advisory_locks[n].fetch(:key) == key }
if name_for_key && name_for_key != name
raise Error, "Lock key #{key} is already taken"
end

function = lock_function.to_sym
unless LOCK_FUNCTIONS.include?(function)
raise Error, "Invalid lock function :#{function}"
end

registered_advisory_locks[name] = { key: key, lock_function: function }
end

def advisory_lock_key_for(lock_name)
Zlib.crc32(lock_name.to_s) % 2 ** 31
end
end
end

Database.register_extension(:pg_advisory_lock, Postgres::PgAdvisoryLock)
end
3 changes: 3 additions & 0 deletions script/docker/db-migrate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

PACT_BROKER_TEST_DATABASE_URL=postgres://postgres:postgres@localhost/postgres bundle exec rake pact_broker:db:migrate
Loading