Skip to content

Commit

Permalink
Add s3/DB Sync report
Browse files Browse the repository at this point in the history
  • Loading branch information
johnf committed Oct 13, 2024
1 parent d569019 commit 1cece25
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 242 deletions.
10 changes: 10 additions & 0 deletions app/mailers/admin_mailer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class AdminMailer < ApplicationMailer
default to: '[email protected]'

def catalog_s3_sync_report
@s3_only = params[:s3_only]
@db_only = params[:db_only]

mail(subject: "[NABU Admin] Catalog S3 Sync Report: #{Date.today.strftime('%F')}")
end
end
156 changes: 79 additions & 77 deletions app/services/catalog_db_sync_validator_service.rb
Original file line number Diff line number Diff line change
@@ -1,116 +1,118 @@
require 'csv'
require 'aws-sdk-s3'

class CatalogDbSyncValidatorService
attr_reader :catalog_dir, :verbose

def initialize(verbose: false)
@catalog_dir = '/srv/catalog'
@verbose = verbose
def initialize(env)
@bucket = "nabu-meta-#{env}"
@prefix = "inventories/catalog/nabu-catalog-#{env}/CatalogBucketInventory0/"

# Strange bug in dev docker
ENV.delete('AWS_SECRET_ACCESS_KEY')
ENV.delete('AWS_ACCESS_KEY_ID')
ENV.delete('AWS_SESSION_TOKEN')

@s3 = Aws::S3::Client.new(region: 'ap-southeast-2')
end

def run
process_collections
end
inventory_dir = find_recent_inventory_dir
inventory_csv = fetch_inventory_csv(inventory_dir)

private
s3_files = extract_s3_files(inventory_csv)

def process_items(collection, db_items)
item_ids(collection.identifier).each do |item_id|
item = db_items.find { |i| i.identifier == item_id }
unless item
# FIXME: Put this back
# puts "WARNING: ITEM LEVEL: #{collection.identifier}/#{item_id} does not exist in the database"
next
end
essence_files = Essence
.includes(item: [:collection])
.map(&:full_identifier)

db_essences = item.essences.to_a.each { | essence | essence.filename = essence.filename.downcase }
process_essences(collection, item, db_essences)
end
db_only = essence_files - s3_files
s3_only = s3_files - essence_files

AdminMailer.with(db_only:, s3_only:).catalog_s3_sync_report.deliver_now
end

def process_essences(collection, item, db_essences)
essences = essence_ids(collection.identifier, item.identifier)
files = essences.map { |essence| essence.sub(/\.*[a-zA-Z0-9]+$/, '') }.uniq
private

essences.each do |essence_id|
next if essence_id =~ /#{collection.identifier}-#{item.identifier}-(CAT|LIST|df|df_revised|df_ammended)-PDSC_ADMIN.(xml|pdf|html|html|rtf)/
def extract_s3_files(inventory_csv)
s3_files = []

if essence_id =~ /PDSC_ADMIN/
fileprefix = essence_id.sub(/-(spectrum|thumb|checksum|soundimage|preview|sjo01df|ind01df|asfdf|mwfdf|amhdf|ban01df|ropdf|jpn04df|jpn02df|tcidf|mjkdf|jpndf|kac01df)-PDSC_ADMIN\.(jpg|json|txt|pdf|ogg|htm|jpgf.tif)$/, '')
next if files.include?(fileprefix)
CSV.parse(inventory_csv, headers: false) do |row|
_bucket_name, filename, _version_id, is_latest, delete_marker, _size, _last_modified, _etag,
storage_class, multiple_upload, multipart_upload_flag, replication_status, checksum_algo = row

puts "WARNING: ITEM LEVEL: #{collection.identifier}/#{item.identifier}/#{essence_id} is unknown PDSC_ADMIN file"
next
end
next if is_latest == 'false' || delete_marker == 'true'

essence = db_essences.find { |i| i.filename == essence_id.downcase }
unless essence
puts "WARNING: ITEM LEVEL: #{collection.identifier}/#{item.identifier}/#{essence_id} does not exist in the database"
next
end
s3_files << CGI.unescape(filename)
end
end

def process_collections
collection_ids.each do |collection_id|
puts "## #{collection_id}" if verbose
s3_files = s3_files.reject { |filename| filename.ends_with?('pdsc_admin/ro-crate-metadata.json') }
.reject { |filename| filename.starts_with?('pdsc_admin/') && filename.ends_with?('-deposit.pdf') }
# TODO: Remove this after we migrate all the df files
.reject { |filename| filename.ends_with?('df-PDSC_ADMIN.pdf') }

collection = Collection.find_by(identifier: collection_id)
unless collection
puts "WARNING: COLLECTION LEVEL: #{collection_id} does not exist in the database"
next
end

process_items(collection, collection.items.to_a)
if s3_files.size != s3_files.uniq.size
raise 'Duplicate files in S3 inventory'
end
end

def essence_ids(collection_id, item_id)
ids = []

Dir.entries(File.join(catalog_dir, collection_id, item_id)).each do |dir|
next if ['.', '..'].include?(dir)
s3_files
end

unless File.file?(File.join(catalog_dir, collection_id, item_id, dir))
puts "WARNING: ITEM LEVEL: #{collection_id}/#{item_id}/#{dir} is not a file"
next
end
def fetch_inventory_csv(inventory_dir)
manifest_json = @s3.get_object(bucket: @bucket, key: "#{inventory_dir}manifest.json").body.read
manifest = JSON.parse(manifest_json)

ids << dir
files = manifest['files']
if files.size > 1
raise 'Multiple files in manifest'
end

ids
end
file = files.first['key']

def item_ids(collection_id)
ids = []
# Download the S3 Inventory CSV file
inventory_gzipped = @s3.get_object(bucket: @bucket, key: file).body.read
inventory_csv = Zlib::GzipReader.new(StringIO.new(inventory_gzipped)).read
end

Dir.entries(File.join(catalog_dir, collection_id)).each do |dir|
next if ['.', '..'].include?(dir)
def find_recent_inventory_dir
inventory_files = fetch_inventory_files

unless File.directory?(File.join(catalog_dir, collection_id, dir))
puts "WARNING: ITEM LEVEL: #{collection_id}/#{dir} is not a directory"
next
# Extract the timestamp part from each key and convert it to Time object
timestamped_files = inventory_files.map do |key|
match = key.match(/CatalogBucketInventory0\/(\d{4})-(\d{2})-(\d{2})T(\d{2})-(\d{2})Z/)
if match
year, month, day, hour, minute = match.captures
time = Time.new(year, month, day, hour, minute)
{ key: key, time: time }
end
end.compact

ids << dir
end
# Find the most recent file
most_recent_dir = timestamped_files.max_by { |file| file[:time] }

ids
most_recent_dir[:key]
end

def collection_ids
ids = []
def fetch_inventory_files
inventory_files = []
next_token = nil

Dir.entries(catalog_dir).each do |dir|
next if ['.', '..', '.afm', '0001-Backups', '0002-Migration'].include?(dir)
loop do
response = @s3.list_objects_v2(
bucket: @bucket,
prefix: @prefix,
delimiter: '/',
continuation_token: next_token
)

unless File.directory?(File.join(catalog_dir, dir))
puts "WARNING: COLLECTION LEVEL: #{dir} is not a directory"
next
end
# Collect all object keys
inventory_files += response.common_prefixes.map(&:prefix)

break unless response.is_truncated

ids << dir
next_token = response.next_continuation_token
end

ids
inventory_files
end
end
154 changes: 154 additions & 0 deletions app/services/junk_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
require 'csv'
require 'aws-sdk-s3'

# NOTE: We use this service for random oneoff scripts we need over time
class JunkService
attr_reader :catalog_dir, :verbose

def initialize(env)
@bucket = "nabu-meta-#{env}"
@prefix = "inventories/catalog/nabu-catalog-#{env}/CatalogBucketInventory0/"

# Strange bug in dev docker
ENV.delete('AWS_SECRET_ACCESS_KEY')
ENV.delete('AWS_ACCESS_KEY_ID')
ENV.delete('AWS_SESSION_TOKEN')

@s3 = Aws::S3::Client.new(region: 'ap-southeast-2')
end

# def run
# inventory_dir = find_recent_inventory_dir
# inventory_csv = fetch_inventory_csv(inventory_dir)
#
# s3_files = extract_s3_files(inventory_csv)
#
# s3_files.select! { |filename| filename.end_with?('PDSC_ADMIN.pdf') }
#
# collections = {}
# s3_files.each do |filename|
# collection = filename.split('/')[0]
# collections[collection] ||= []
# collections[collection] << filename
# end
#
# catalog_bucket = 'nabu-catalog-prod'
#
# collections.each do |collection, files|
# if files.size != 1
# puts "Collection: #{collection}"
# files.each do |file|
# puts " #{file}"
# end
# puts
# end
#
# # TODO MOVE
# src = files[0]
# dst = "#{collection}/pdsc_admin/#{collection}-deposit.pdf"
# print "Moving #{src} to #{dst} - "
#
# begin
# @s3.head_object(bucket: catalog_bucket, key: dst)
# puts 'ERROR: dst exists skipping'
# next
# rescue Aws::S3::Errors::NotFound
# # We dont' want it to exist
# end
#
# begin
# @s3.copy_object(bucket: catalog_bucket, copy_source: "#{catalog_bucket}/#{src}", key: dst)
# @s3.delete_object(bucket: catalog_bucket, key: src)
# rescue Aws::S3::Errors::NoSuchKey => e
# puts 'Something went wrong moving'
# puts e
# exit 1
# end
#
# puts 'OK'
# end
# end
#
# private
#
# def extract_s3_files(inventory_csv)
# s3_files = []
#
# CSV.parse(inventory_csv, headers: false) do |row|
# _bucket_name, filename, _version_id, is_latest, delete_marker, _size, _last_modified, _etag,
# storage_class, multiple_upload, multipart_upload_flag, replication_status, checksum_algo = row
#
# next if is_latest == 'false' || delete_marker == 'true'
#
# s3_files << CGI.unescape(filename)
# end
#
# if s3_files.size != s3_files.uniq.size
# raise 'Duplicate files in S3 inventory'
# end
#
# s3_files
# end
#
# def fetch_inventory_csv(inventory_dir)
# manifest_json = @s3.get_object(bucket: @bucket, key: "#{inventory_dir}manifest.json").body.read
# manifest = JSON.parse(manifest_json)
#
# files = manifest['files']
# if files.size > 1
# raise 'Multiple files in manifest'
# end
#
# file = files.first['key']
#
# # Download the S3 Inventory CSV file
# puts "Downloading S3 Inventory CSV file: #{file}"
# inventory_gzipped = @s3.get_object(bucket: @bucket, key: file).body.read
# puts "Unzipping file: #{file}\n\n"
# inventory_csv = Zlib::GzipReader.new(StringIO.new(inventory_gzipped)).read
# end
#
# def find_recent_inventory_dir
# inventory_files = fetch_inventory_files
#
# # Extract the timestamp part from each key and convert it to Time object
# timestamped_files = inventory_files.map do |key|
# match = key.match(/CatalogBucketInventory0\/(\d{4})-(\d{2})-(\d{2})T(\d{2})-(\d{2})Z/)
# if match
# year, month, day, hour, minute = match.captures
# time = Time.new(year, month, day, hour, minute)
# { key: key, time: time }
# end
# end.compact
#
# # Find the most recent file
# most_recent_dir = timestamped_files.max_by { |file| file[:time] }
#
# puts "Most recent inventory file: #{most_recent_dir[:key]}"
#
# most_recent_dir[:key]
# end
#
# def fetch_inventory_files
# inventory_files = []
# next_token = nil
#
# loop do
# response = @s3.list_objects_v2(
# bucket: @bucket,
# prefix: @prefix,
# delimiter: '/',
# continuation_token: next_token
# )
#
# # Collect all object keys
# inventory_files += response.common_prefixes.map(&:prefix)
#
# break unless response.is_truncated
#
# next_token = response.next_continuation_token
# end
#
# inventory_files
# end
end
13 changes: 13 additions & 0 deletions app/views/admin_mailer/catalog_s3_sync_report.text.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
This report runs weekly and compares the files in the S3 bucket with the files in the database.

## Files in database but missing from S3

<% @db_only.each do |file| -%>
= <%= file %>
<% end -%>

## Files in S3 but missing from the database

<% @s3_only.each do |file| -%>
= <%= file %>
<% end -%>
13 changes: 13 additions & 0 deletions cron-worker/cron.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@

scheduler = Rufus::Scheduler.new

scheduler.cron '27 4 * * 2' do
name = 'DB/S3 Sync'
task = 'catalog:check_db_s3_sync'

puts "#{Time.current}: Starting task #{name}"

begin
Rake::Task[task].invoke
ensure
Rake::Task[task].reenable
end
end

scheduler.cron '10 1 * * *' do
name = 'Mint Dois'
task = 'archive:mint_dois'
Expand Down
Loading

0 comments on commit 1cece25

Please sign in to comment.