Skip to content

Commit

Permalink
All jobs now extend ActiveJob::Base
Browse files Browse the repository at this point in the history
  • Loading branch information
elohanlon committed Jan 2, 2025
1 parent 50af969 commit 39d8d02
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 155 deletions.
10 changes: 5 additions & 5 deletions app/jobs/export_search_results_to_csv_job.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class ExportSearchResultsToCsvJob
class ExportSearchResultsToCsvJob < ActiveJob::Base
include Hyacinth::Csv::Flatten

SUPPRESSED_ON_EXPORT = ['_uuid', '_data_file_path', '_dc_type', '_state', '_title', '_created', '_modified', '_created_by', '_modified_by', '_project.uri', '_project.short_label']
Expand All @@ -16,9 +16,9 @@ class ExportSearchResultsToCsvJob
]
CONTROLLED_TERM_CORE_SUBFIELDS_ALLOWED_ON_IMPORT = ['uri', 'value', 'authority', 'type']

@queue = Hyacinth::Queue::DIGITAL_OBJECT_CSV_EXPORT
queue_as Hyacinth::Queue::DIGITAL_OBJECT_CSV_EXPORT

def self.perform(csv_export_id)
def perform(csv_export_id)
start_time = Time.now

csv_export = CsvExport.find(csv_export_id)
Expand Down Expand Up @@ -59,7 +59,7 @@ def self.perform(csv_export_id)
csv_export.save
end

def self.map_temp_field_indexes(search_params, user, map = {})
def map_temp_field_indexes(search_params, user, map = {})
# Common fields to all objects
map['_pid'] ||= map.length
map['_project.string_key'] ||= map.length
Expand Down Expand Up @@ -111,7 +111,7 @@ def self.map_temp_field_indexes(search_params, user, map = {})
map
end

def self.write_csv(path_to_csv_file, field_list, field_index_map)
def write_csv(path_to_csv_file, field_list, field_index_map)
# Open new CSV for writing
CSV.open(path_to_csv_file, 'wb') do |final_csv|
# Write out human-friendly column display labels
Expand Down
29 changes: 15 additions & 14 deletions app/jobs/process_digital_object_import_job.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
class ProcessDigitalObjectImportJob
@queue = Hyacinth::Queue::DIGITAL_OBJECT_IMPORT_LOW
class ProcessDigitalObjectImportJob < ActiveJob::Base
queue_as Hyacinth::Queue::DIGITAL_OBJECT_IMPORT_LOW

UNEXPECTED_PROCESSING_ERROR_RETRY_DELAY = 60
FIND_DIGITAL_OBJECT_IMPORT_RETRY_DELAY = 10

def self.perform(digital_object_import_id)
def perform(digital_object_import_id)
# If the import job was previously deleted (and does not exist in the database), return immediately
return unless DigitalObjectImport.exists?(digital_object_import_id)

Expand Down Expand Up @@ -44,7 +45,7 @@ def self.perform(digital_object_import_id)
handle_unexpected_processing_error(digital_object_import_id, e)
end

def self.find_or_create_digital_object(digital_object_data, user, digital_object_import)
def find_or_create_digital_object(digital_object_data, user, digital_object_import)
if existing_object?(digital_object_data)
# We're updating data for an existing object
existing_object_for_update(digital_object_data, user)
Expand All @@ -58,7 +59,7 @@ def self.find_or_create_digital_object(digital_object_data, user, digital_object
# digital_object_import instance because when it's called, we can't guarantee
# that we were able to successfully obtain a digital_object_import instance.
# We try multiple times, within this method, to obtain an instance.
def self.handle_unexpected_processing_error(digital_object_import_id, e, queue_long_jobs = HYACINTH[:queue_long_jobs])
def handle_unexpected_processing_error(digital_object_import_id, e, queue_long_jobs = HYACINTH[:queue_long_jobs])
# In the case of some unexpected, otherwise unhandled error, mark this job
# as a failure so that it doesn't get stuck as pending forever, causing
# other jobs that depend on it to be requeued forever.
Expand All @@ -77,18 +78,18 @@ def self.handle_unexpected_processing_error(digital_object_import_id, e, queue_l
end
end

def self.find_digital_object_import_with_retry(digital_object_import_id)
def find_digital_object_import_with_retry(digital_object_import_id)
# Retry in case database is briefly unavailable
Retriable.retriable(tries: 3, base_interval: FIND_DIGITAL_OBJECT_IMPORT_RETRY_DELAY) do
return DigitalObjectImport.find(digital_object_import_id)
end
end

def self.existing_object?(digital_object_data)
def existing_object?(digital_object_data)
digital_object_data['pid'].present? && DigitalObject::Base.exists?(digital_object_data['pid'])
end

def self.assign_data(digital_object, digital_object_data, digital_object_import)
def assign_data(digital_object, digital_object_data, digital_object_import)
digital_object.set_digital_object_data(digital_object_data, true)
:success
rescue Hyacinth::Exceptions::ParentDigitalObjectNotFoundError => e
Expand All @@ -99,17 +100,17 @@ def self.assign_data(digital_object, digital_object_data, digital_object_import)
:failure
end

def self.exception_with_backtrace_as_error_message(e)
def exception_with_backtrace_as_error_message(e)
e.message + "\n<span class=\"backtrace\">Backtrace:\n\t#{e.backtrace.join("\n\t")}</span>"
end

def self.existing_object_for_update(digital_object_data, user)
def existing_object_for_update(digital_object_data, user)
digital_object = DigitalObject::Base.find(digital_object_data['pid'])
digital_object.updated_by = user
digital_object
end

def self.new_object(digital_object_type, user, digital_object_import)
def new_object(digital_object_type, user, digital_object_import)
digital_object = DigitalObjectType.get_model_for_string_key(digital_object_type || :missing).new
digital_object.created_by = user
digital_object.updated_by = user
Expand All @@ -119,7 +120,7 @@ def self.new_object(digital_object_type, user, digital_object_import)
nil
end

def self.handle_success_or_failure(status, digital_object, digital_object_import, do_solr_commit)
def handle_success_or_failure(status, digital_object, digital_object_import, do_solr_commit)
if status == :success && digital_object.save(do_solr_commit)
digital_object_import.digital_object_errors = []
digital_object_import.status = :success
Expand All @@ -137,7 +138,7 @@ def self.handle_success_or_failure(status, digital_object, digital_object_import
# If prerequisite rows are pending, then this digital_object_import will be requeued.
# If prerequisite rows failed, then this digital_object_import will also fail with
# a prerequisite-related error message.
def self.prerequisite_row_check(digital_object_import, queue_long_jobs = HYACINTH[:queue_long_jobs])
def prerequisite_row_check(digital_object_import, queue_long_jobs = HYACINTH[:queue_long_jobs])
# If this import has prerequisite_csv_row_numbers, make sure that the job
# with that prerequisite_csv_row_numbers has been completed. If it hasn't,
# we'll re-queue this job.
Expand Down Expand Up @@ -192,7 +193,7 @@ def self.prerequisite_row_check(digital_object_import, queue_long_jobs = HYACINT
true
end

def self.handle_remaining_prerequisite_case(digital_object_import, queue_long_jobs)
def handle_remaining_prerequisite_case(digital_object_import, queue_long_jobs)
if queue_long_jobs
# If prerequisite are still pending, then re-queue this import
digital_object_import.digital_object_errors = [] # clear earlier errors if we're re-queueing
Expand Down
6 changes: 3 additions & 3 deletions app/jobs/reindex_digital_object_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
class ReindexDigitalObjectJob
@queue = Hyacinth::Queue::DIGITAL_OBJECT_REINDEX
class ReindexDigitalObjectJob < ActiveJob::Base
queue_as Hyacinth::Queue::DIGITAL_OBJECT_REINDEX

def self.perform(digital_object_pid)
def perform(digital_object_pid)
# Pass false param to update_index() method so that we don't do a Solr commit for each update (because that would be inefficient).
DigitalObject::Base.find(digital_object_pid).update_index(false)
end
Expand Down
98 changes: 48 additions & 50 deletions lib/hyacinth/csv/flatten.rb
Original file line number Diff line number Diff line change
@@ -1,65 +1,63 @@
module Hyacinth::Csv::Flatten
extend ActiveSupport::Concern

module ClassMethods
def keys_for_document(document, omit_blank_values = false)
document = document.clone
df_data = document.delete(DigitalObject::DynamicField::DATA_KEY) || {}
def keys_for_document(document, omit_blank_values = false)
document = document.clone
df_data = document.delete(DigitalObject::DynamicField::DATA_KEY) || {}

internals = pointers_for_hash(document, omit_blank_values)
internals.map! { |pointer| Hyacinth::Csv::Fields::Internal.new(pointer) }
dynamics = pointers_for_hash(df_data, omit_blank_values)
dynamics.map! { |pointer| Hyacinth::Csv::Fields::Dynamic.new(pointer) }
internals.map(&:to_header) + dynamics.map(&:to_header)
end
internals = pointers_for_hash(document, omit_blank_values)
internals.map! { |pointer| Hyacinth::Csv::Fields::Internal.new(pointer) }
dynamics = pointers_for_hash(df_data, omit_blank_values)
dynamics.map! { |pointer| Hyacinth::Csv::Fields::Dynamic.new(pointer) }
internals.map(&:to_header) + dynamics.map(&:to_header)
end

def pointers_for_hash(hash, omit_blank_values, prefix = [])
keys = []
hash.each do |key, value|
if value.is_a?(Array)
key_prefix = prefix + [key]
keys += pointers_for_array(value, omit_blank_values, key_prefix)
elsif uri_hash?(value)
key_prefix = prefix + [key]
keys += pointers_for_uri(value, omit_blank_values, key_prefix)
elsif value.is_a?(Hash)
key_prefix = prefix + [key]
keys += pointers_for_hash(value, omit_blank_values, key_prefix)
else
key = pointer_for_value(key, value, omit_blank_values, prefix)
keys << key unless keys.include?(key) || key.nil?
end
def pointers_for_hash(hash, omit_blank_values, prefix = [])
keys = []
hash.each do |key, value|
if value.is_a?(Array)
key_prefix = prefix + [key]
keys += pointers_for_array(value, omit_blank_values, key_prefix)
elsif uri_hash?(value)
key_prefix = prefix + [key]
keys += pointers_for_uri(value, omit_blank_values, key_prefix)
elsif value.is_a?(Hash)
key_prefix = prefix + [key]
keys += pointers_for_hash(value, omit_blank_values, key_prefix)
else
key = pointer_for_value(key, value, omit_blank_values, prefix)
keys << key unless keys.include?(key) || key.nil?
end
keys.uniq
end
keys.uniq
end

def pointers_for_array(array, omit_blank_values, prefix)
keys = []
array.each_with_index do |value, index|
if value.is_a? Hash
keys += pointers_for_hash(value, omit_blank_values, prefix + [index])
else
key = pointer_for_value(index, value, omit_blank_values, prefix)
keys << key unless keys.include?(key) || key.nil?
end
def pointers_for_array(array, omit_blank_values, prefix)
keys = []
array.each_with_index do |value, index|
if value.is_a? Hash
keys += pointers_for_hash(value, omit_blank_values, prefix + [index])
else
key = pointer_for_value(index, value, omit_blank_values, prefix)
keys << key unless keys.include?(key) || key.nil?
end
keys
end
keys
end

def pointer_for_value(key, value, omit_blank_values, prefix = [])
return nil if omit_blank_values && !value.is_a?(TrueClass) && !value.is_a?(FalseClass) && value.blank?
prefix + [key]
end
def pointer_for_value(key, value, omit_blank_values, prefix = [])
return nil if omit_blank_values && !value.is_a?(TrueClass) && !value.is_a?(FalseClass) && value.blank?
prefix + [key]
end

def uri_hash?(hash)
hash.is_a?(Hash) && hash.key?('uri') && !hash['uri'].is_a?(Hash)
end
def uri_hash?(hash)
hash.is_a?(Hash) && hash.key?('uri') && !hash['uri'].is_a?(Hash)
end

def pointers_for_uri(hash, omit_blank_values, prefix = [])
return nil if omit_blank_values && hash.blank?
hash.map do |key, value|
pointer_for_value("#{prefix[-1]}.#{key}", value, omit_blank_values, prefix[0...-1])
end.compact
end
def pointers_for_uri(hash, omit_blank_values, prefix = [])
return nil if omit_blank_values && hash.blank?
hash.map do |key, value|
pointer_for_value("#{prefix[-1]}.#{key}", value, omit_blank_values, prefix[0...-1])
end.compact
end
end
12 changes: 6 additions & 6 deletions lib/hyacinth/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,25 @@ def self.process_digital_object_import(digital_object_import)
raise 'Invalid priority: ' + priority.inspect
end

Resque.enqueue_to(queue_name, ProcessDigitalObjectImportJob, digital_object_import_id)
ProcessDigitalObjectImportJob.set(queue: queue_name).perform_later(digital_object_import_id)
else
ProcessDigitalObjectImportJob.perform(digital_object_import_id)
ProcessDigitalObjectImportJob.perform_now(digital_object_import_id)
end
end

def self.export_search_results_to_csv(csv_export_id)
if HYACINTH[:queue_long_jobs]
Resque.enqueue(ExportSearchResultsToCsvJob, csv_export_id)
ExportSearchResultsToCsvJob.perform_later(csv_export_id)
else
ExportSearchResultsToCsvJob.perform(csv_export_id)
ExportSearchResultsToCsvJob.perform_now(csv_export_id)
end
end

def self.reindex_digital_object(digital_object_pid)
if HYACINTH[:queue_long_jobs]
Resque.enqueue(ReindexDigitalObjectJob, digital_object_pid)
ReindexDigitalObjectJob.perform_later(digital_object_pid)
else
ReindexDigitalObjectJob.perform(digital_object_pid)
ReindexDigitalObjectJob.perform_now(digital_object_pid)
end
end
end
1 change: 0 additions & 1 deletion spec/features/digital_object_editor_ui_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@

it "can create a new Digital Object", :js => true do
expect(page).to have_content 'New Digital Object'

end
end
4 changes: 2 additions & 2 deletions spec/integration/csv_export_import_round_trip_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
'fq' => { 'hyacinth_type_sim' => [{ 'does_not_equal' => 'publish_target' }] }
})
)
ExportSearchResultsToCsvJob.perform(first_csv_export.id)
ExportSearchResultsToCsvJob.perform_now(first_csv_export.id)
first_csv_export.reload # Reload the ActiveRecord object, getting the latest data in the DB (so we have the path to the csv file)
path_to_first_csv_file = first_csv_export.path_to_csv_file

Expand Down Expand Up @@ -131,7 +131,7 @@
'fq' => { 'hyacinth_type_sim' => [{ 'does_not_equal' => 'publish_target' }] }
})
)
ExportSearchResultsToCsvJob.perform(second_csv_export.id)
ExportSearchResultsToCsvJob.perform_now(second_csv_export.id)
second_csv_export.reload # Reload the ActiveRecord object, getting the latest data in the DB (so we have the path to the csv file)
path_to_second_csv_file = second_csv_export.path_to_csv_file
expect(CSV.read(path_to_first_csv_file)).to eq(CSV.read(path_to_second_csv_file))
Expand Down
8 changes: 5 additions & 3 deletions spec/jobs/export_search_results_to_csv_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
let(:user) { User.new }
let(:search_params) { Hash.new }

let(:instance) { described_class.new }

before do
allow(DigitalObject::Base).to receive(:search_in_batches)
.and_yield(doc1).and_yield(doc2)
end

describe '.map_temp_field_indexes' do
describe '#map_temp_field_indexes' do
let(:expected) do
{
'_pid' => 0,
Expand All @@ -27,7 +29,7 @@
}
end

subject { ExportSearchResultsToCsvJob.map_temp_field_indexes(search_params, user) }
subject { instance.map_temp_field_indexes(search_params, user) }

it do
is_expected.to eql expected
Expand Down Expand Up @@ -79,7 +81,7 @@
allow(CsvExport).to receive(:find).with(export_id).and_return(export)
allow(DigitalObject::Base).to receive(:search_in_batches).and_yield(sample_json_document)
expect(export).to receive(:path_to_csv_file).and_call_original
described_class.perform(export_id)
instance.perform(export_id)
expect(export.number_of_records_processed).to eq(1)
actual_csv = CSV.read(export.path_to_csv_file)

Expand Down
Loading

0 comments on commit 39d8d02

Please sign in to comment.