Skip to content

Commit

Permalink
Distribution Imports: run in a background queue (#142)
Browse files Browse the repository at this point in the history
* Distributions: move imports to background queue

Addresses #137
  • Loading branch information
ewlarson authored Jan 14, 2025
1 parent dd218a3 commit 313e193
Show file tree
Hide file tree
Showing 40 changed files with 969 additions and 74 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The gem is available as open source under the terms of the [Apache 2.0 License](
* ~~Debug Rails 7.2 support (remove devise_invitable, see [#915](https://github.com/scambra/devise_invitable/issues/915))~~
* ~~Separate dct_references_s support into a separate model~~
* ~~Import/Export dct_references_s outside of the main document model~~
* Distributions: Move import to background queue
* Data Dictionary: Add support for `document_data_dictionary`
* Gazetteer: Add GeoNames support
* Gazetteer: Add Who's On First support
Expand Down
38 changes: 0 additions & 38 deletions app/controllers/admin/document_distributions_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,44 +112,6 @@ def destroy_all
end
end

# GET/POST /documents/1/distributions/import
#
# Imports document distributions from a file. If successful, redirects with a success notice.
# Otherwise, redirects with an error notice.
def import
return if request.get?

logger.debug("Import Distributions")

unless params.dig(:document_distribution, :distributions, :file)
raise ArgumentError, "File does not exist or is invalid."
end

success, errors = DocumentDistribution.import(params.dig(:document_distribution, :distributions, :file))
if success == true
logger.debug("Distributions were created successfully.")
if params[:document_id]
redirect_to admin_document_document_distributions_path(@document), notice: "Distributions were created successfully."
else
redirect_to admin_document_distributions_path, notice: "Distributions were created successfully."
end
else
logger.debug("Some distributions could not be created. #{errors.join(", ")}")
if params[:document_id]
redirect_to admin_document_document_distributions_path(@document), notice: "Some distributions could not be created. #{errors.join(", ")}"
else
redirect_to admin_document_distributions_path, notice: "Some distributions could not be created. #{errors.join(", ")}"
end
end
rescue => e
logger.debug("Distributions could not be created. #{e}")
if params[:document_id]
redirect_to admin_document_document_distributions_path(@document), notice: "Distributions could not be created. #{e}"
else
redirect_to admin_document_distributions_path, notice: "Distributions could not be created. #{e}"
end
end

private

# Sets the document based on the document_id parameter.
Expand Down
126 changes: 126 additions & 0 deletions app/controllers/admin/import_distributions_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# frozen_string_literal: true

# Admin::ImportDistributionsController
#
# This controller handles the CRUD operations for ImportDistribution objects within the admin namespace.
# It provides actions to list, show, create, update, and delete import distributions, as well as run an import distribution.
#
# Before Actions:
# - set_import_distribution: Sets the @import_distribution instance variable for actions that require an import distribution ID.
#
# Actions:
# - index: Lists all imports with pagination.
# - show: Displays a specific import and its associated documents, with pagination for success and failed states.
# - new: Initializes a new Import object.
# - edit: Prepares an existing Import object for editing.
# - create: Creates a new Import object and redirects to import mappings if successful.
# - update: Updates an existing Import object and redirects to the import if successful.
# - destroy: Deletes an Import object and redirects to the imports list.
# - run: Executes the import process and redirects to the import show page.
#
# Private Methods:
# - set_import: Finds and sets the import based on the provided ID.
# - permittable_params: Returns an array of permitted parameters for import.
# - import_params: Permits parameters for creating or updating an import, including nested attributes.
module Admin
class ImportDistributionsController < Admin::AdminController
before_action :set_import_distribution, only: %i[show edit update destroy run]

# GET /import_distributions
# GET /import_distributions.json
# Lists all import distributions with pagination.
def index
@pagy, @import_distributions = pagy(ImportDistribution.all.order("created_at DESC"), items: 20)
end

# GET /import_distributions/1
# GET /import_distributions/1.json
# Displays a specific import distribution and its associated documents, with pagination for success and failed states.
def show
@pagy_failed, @import_failed_distributions = pagy(@import_distribution.import_document_distributions.not_in_state(:success), items: 50, page_param: :failed_page)
@pagy_success, @import_success_distributions = pagy(@import_distribution.import_document_distributions.in_state(:success), items: 50, page_param: :success_page)
end

# GET /import_distributions/new
# Initializes a new ImportDistribution object.
def new
@import_distribution = ImportDistribution.new
end

# GET /import_distributions/1/edit
# Prepares an existing ImportDistribution object for editing.
def edit
end

# POST /import_distributions
# POST /import_distributions.json
# Creates a new ImportDistribution object
def create
@import_distribution = ImportDistribution.new(import_distribution_params)

respond_to do |format|
if @import_distribution.save
format.html do
redirect_to admin_import_distribution_path(@import_distribution),
notice: "Import distribution was successful."
end
format.json { render :show, status: :created, location: @import_distribution }
else
format.html { render :new, status: :unprocessable_entity }
format.json { render json: @import_distribution.errors, status: :unprocessable_entity }
end
end
end

# PATCH/PUT /import_distributions/1
# PATCH/PUT /import_distributions/1.json
# Updates an existing ImportDistribution object and redirects to the import distribution if successful.
def update
respond_to do |format|
if @import_distribution.update(import_distribution_params)
format.html { redirect_to admin_import_distribution_path(@import_distribution), notice: "Import distribution was successfully updated." }
format.json { render :show, status: :ok, location: @import_distribution }
else
format.html { render :edit, status: :unprocessable_entity }
format.json { render json: @import_distribution.errors, status: :unprocessable_entity }
end
end
end

# DELETE /import_distributions/1
# DELETE /import_distributions/1.json
# Deletes an ImportDistribution object and redirects to the import distributions list.
def destroy
@import_distribution.destroy
respond_to do |format|
format.html { redirect_to admin_import_distributions_url, notice: "Import distribution was successfully destroyed." }
format.json { head :no_content }
end
end

# Runs the import process and redirects to the import show page.
def run
@import_distribution.run!
redirect_to admin_import_distribution_url(@import_distribution), notice: "Import distribution is running. Check back soon for results."
end

private

# Use callbacks to share common setup or constraints between actions.
# Finds and sets the import distribution based on the provided ID.
def set_import_distribution
@import_distribution = ImportDistribution.find(params[:id])
end

# Returns an array of permitted parameters for import distribution.
def permittable_params
%i[name filename source description row_count encoding content_type extension validity validation_result
csv_file run]
end

# Permits parameters for creating or updating an import distribution, including nested attributes.
def import_distribution_params
params.require(:import_distribution).permit(permittable_params)
end
end
end
32 changes: 32 additions & 0 deletions app/jobs/import_distributions_run_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

# ImportDistributionsRunJob class
class ImportDistributionsRunJob < ApplicationJob
queue_as :priority

def perform(import)
data = CSV.parse(import.csv_file.download, headers: true)

data.each do |dist|
extract_hash = dist.to_h

document_distribution_hash = {
friendlier_id: extract_hash["friendlier_id"],
reference_type: extract_hash["reference_type"],
distribution_url: extract_hash["distribution_url"],
label: extract_hash["label"],
import_distribution_id: import.id
}

# Capture document distribution for import attempt
import_document_distribution = ImportDocumentDistribution.create(document_distribution_hash)

# Add import document distribution to background job queue
ImportDocumentDistributionJob.perform_later(import_document_distribution)
rescue => e
logger.debug "\n\nCANNOT IMPORT DISTRIBUTION: #{extract_hash.inspect}"
logger.debug "Error: #{e.inspect}\n\n"
next
end
end
end
23 changes: 23 additions & 0 deletions app/jobs/import_document_distribution_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

# ImportDocumentDistributionJob class
class ImportDocumentDistributionJob < ApplicationJob
queue_as :priority

def perform(import_document_distribution)
document_distribution = DocumentDistribution.find_or_create_by(
friendlier_id: import_document_distribution.friendlier_id,
reference_type: ReferenceType.find_by(name: import_document_distribution.reference_type),
url: import_document_distribution.distribution_url
)

if document_distribution.update(import_document_distribution.to_hash)
import_document_distribution.state_machine.transition_to!(:success)
else
import_document_distribution.state_machine.transition_to!(:failed, "Failed - #{document_distribution.errors.inspect}")
end
rescue => e
logger.debug("Error: #{e}")
import_document_distribution.state_machine.transition_to!(:failed, "Error - #{e.inspect}")
end
end
55 changes: 55 additions & 0 deletions app/models/import_distribution.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

require "csv"

# ImportDistribution class
class ImportDistribution < ApplicationRecord
include ActiveModel::Validations

# Callbacks (keep at top)
after_commit :set_csv_file_attributes, if: :persisted?

# Associations
has_one_attached :csv_file
has_many :document_distributions, dependent: :destroy
has_many :import_document_distributions, dependent: :destroy
has_many :import_distribution_transitions, autosave: false, dependent: :destroy

# Validations
validates :csv_file, attached: true, content_type: {in: "text/csv", message: "is not a CSV file"}

validates_with ImportDistribution::CsvHeaderValidator

# States
include Statesman::Adapters::ActiveRecordQueries[
transition_class: ImportDistributionTransition,
initial_state: :created
]

def state_machine
@state_machine ||= ImportDistributionStateMachine.new(self, transition_class: ImportDistributionTransition)
end

def set_csv_file_attributes
parsed = CSV.parse(csv_file.download)

update_columns(
headers: parsed[0],
row_count: parsed.size - 1,
content_type: csv_file.content_type.to_s,
filename: csv_file.filename.to_s,
extension: csv_file.filename.extension.to_s
)
end

def run!
# @TODO: guard this call, unless mappings_valid?

# Queue Job
ImportDistributionsRunJob.perform_later(self)

# Capture State
state_machine.transition_to!(:imported)
save
end
end
32 changes: 32 additions & 0 deletions app/models/import_distribution/csv_header_validator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

require "csv"

# CSV Header Validation
class ImportDistribution
# CsvHeaderValidator
class CsvHeaderValidator < ActiveModel::Validator
def validate(record)
if record.csv_file.nil?
record.errors.add(:csv_file, "Missing a required CSV header. friendlier_id, reference_type, distribution_url, and label are required.")
return false
end

valid_csv_header = true
unless valid_csv_headers?(record&.csv_file)
valid_csv_header = false
record.errors.add(:csv_file,
"Missing a required CSV header. friendlier_id, reference_type, distribution_url, and label are required.")
end

valid_csv_header
end

def valid_csv_headers?(csv_file)
headers = CSV.parse(csv_file.download)[0]
(["friendlier_id", "reference_type", "distribution_url", "label"] - headers).empty?
rescue ArgumentError, ActiveStorage::FileNotFoundError
false
end
end
end
14 changes: 14 additions & 0 deletions app/models/import_distribution_state_machine.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

# Import Distribution Statesman
class ImportDistributionStateMachine
include Statesman::Machine

state :created, initial: true
state :imported
state :success
state :failed

transition from: :created, to: [:imported]
transition from: :imported, to: %i[success failed]
end
26 changes: 26 additions & 0 deletions app/models/import_distribution_transition.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

# Add Import Distribution Statesman Transitions
class ImportDistributionTransition < ApplicationRecord
include Statesman::Adapters::ActiveRecordTransition

# If your transition table doesn't have the default `updated_at` timestamp column,
# you'll need to configure the `updated_timestamp_column` option, setting it to
# another column name (e.g. `:updated_on`) or `nil`.
#
# self.updated_timestamp_column = :updated_on
# self.updated_timestamp_column = nil

belongs_to :import_distribution, inverse_of: :import_distribution_transitions

after_destroy :update_most_recent, if: :most_recent?

private

def update_most_recent
last_transition = import_distribution.import_distribution_transitions.order(:sort_key).last
return if last_transition.blank?

last_transition.update_column(:most_recent, true)
end
end
25 changes: 25 additions & 0 deletions app/models/import_document_distribution.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

# ImportDocumentDistribution class
class ImportDocumentDistribution < ApplicationRecord
has_many :import_document_distribution_transitions, autosave: false, dependent: :destroy

include Statesman::Adapters::ActiveRecordQueries[
transition_class: ImportDocumentDistributionTransition,
initial_state: :queued
]

def state_machine
@state_machine ||= ImportDocumentDistributionStateMachine.new(self, transition_class: ImportDocumentDistributionTransition)
end

def to_hash
{
friendlier_id: friendlier_id,
reference_type: ReferenceType.find_by(name: reference_type),
url: distribution_url,
label: label,
import_distribution_id: import_distribution_id
}
end
end
Loading

0 comments on commit 313e193

Please sign in to comment.