From 9ca02bcfc610af04062cdc4402e2a113ef1ba420 Mon Sep 17 00:00:00 2001 From: Brandt Lareau Date: Tue, 26 Nov 2024 10:16:14 -0800 Subject: [PATCH] Enhance Job Management, Optimize Queries, and Introduce Thread-Safe Features (#512) --- app/components/load_disk_process_component.rb | 2 +- app/components/rip_process_component.rb | 4 +- app/components/scan_plex_process_component.rb | 2 +- app/components/upload_process_component.rb | 5 +- app/controllers/images_controller.rb | 2 + app/controllers/movies_controller.rb | 7 +- app/controllers/seasons_controller.rb | 15 +- app/controllers/tvs_controller.rb | 5 +- app/listeners/disk_listener.rb | 2 +- app/models/disk.rb | 2 +- app/models/disk_title.rb | 11 +- app/models/job.rb | 6 +- app/models/video_blob.rb | 2 +- app/services/create_disks_service.rb | 20 ++- app/services/create_mkv_service.rb | 5 +- app/services/disk_info_service.rb | 4 + .../episode_disk_title_selector_service.rb | 8 +- app/tool_box/mkv_parser.rb | 2 +- app/views/seasons/show.html.erb | 12 +- app/workers/application_worker.rb | 30 +--- app/workers/load_disk_worker.rb | 6 +- app/workers/rip_worker.rb | 4 +- app/workers/upload_worker.rb | 7 +- config/environments/development.rb | 2 + config/environments/test.rb | 2 + config/initializers/backgrounder.rb | 137 +++++++++++++----- config/locales/en.yml | 56 ++++++- current_version.txt | 2 +- db/migrate/20241123005122_add_segment_map.rb | 5 + db/schema.rb | 3 +- spec/factories/disk_titles.rb | 1 + spec/models/disk_title_spec.rb | 3 +- spec/services/create_mkv_service_spec.rb | 4 +- spec/services/disk_info_service_spec.rb | 8 +- ...pisode_disk_title_selector_service_spec.rb | 9 ++ .../movie_disk_title_selector_service_spec.rb | 2 + 36 files changed, 282 insertions(+), 115 deletions(-) create mode 100644 db/migrate/20241123005122_add_segment_map.rb diff --git a/app/components/load_disk_process_component.rb b/app/components/load_disk_process_component.rb index 2021d5ca..1c1b5b0b 100644 --- a/app/components/load_disk_process_component.rb +++ b/app/components/load_disk_process_component.rb @@ -4,7 +4,7 @@ class LoadDiskProcessComponent < ViewComponent::Base extend Dry::Initializer def self.job - Job.sort_by_created_at.active.find_by(name: 'LoadDiskWorker') + Backgrounder.managers.find { _1.current_job&.name == 'LoadDiskWorker' }&.current_job end def self.show? diff --git a/app/components/rip_process_component.rb b/app/components/rip_process_component.rb index cf045795..5f8583c2 100644 --- a/app/components/rip_process_component.rb +++ b/app/components/rip_process_component.rb @@ -2,11 +2,11 @@ class RipProcessComponent < ViewComponent::Base def self.job - Job.sort_by_created_at.active.find_by(name: 'RipWorker') + Backgrounder.managers.find { _1.current_job&.name == 'RipWorker' }&.current_job end def self.show? - Video.auto_start.any? || job&.active? + job&.active? || Video.auto_start.any? end def hidden? diff --git a/app/components/scan_plex_process_component.rb b/app/components/scan_plex_process_component.rb index 36333393..e4042396 100644 --- a/app/components/scan_plex_process_component.rb +++ b/app/components/scan_plex_process_component.rb @@ -4,7 +4,7 @@ class ScanPlexProcessComponent < ViewComponent::Base extend Dry::Initializer def self.job - Job.sort_by_created_at.active.find_by(name: 'ScanPlexWorker') + Backgrounder.managers.find { _1.current_job&.name == 'ScanPlexWorker' }&.current_job end def self.show? diff --git a/app/components/upload_process_component.rb b/app/components/upload_process_component.rb index a49c6626..7d49982b 100644 --- a/app/components/upload_process_component.rb +++ b/app/components/upload_process_component.rb @@ -2,7 +2,7 @@ class UploadProcessComponent < ViewComponent::Base def self.job - Job.sort_by_created_at.active.find_by(name: 'UploadWorker') + Backgrounder.managers.find { _1.current_job&.name == 'UploadWorker' }&.current_job end def self.show? @@ -18,7 +18,7 @@ def dom_id end def uploadable_video_blobs - @uploadable_video_blobs ||= VideoBlob.uploadable.order(updated_at: :desc) + @uploadable_video_blobs ||= VideoBlob.uploadable.includes(:video, :episode_last, episode: { season: :tv }).order(updated_at: :desc) end def percentage(completed, total) @@ -27,6 +27,7 @@ def percentage(completed, total) def uploaded_recently_video_blobs @uploaded_recently_video_blobs ||= VideoBlob.uploaded_recently + .includes(:video, :episode_last, episode: [:season]) .order(uploaded_on: :desc) .limit(3) end diff --git a/app/controllers/images_controller.rb b/app/controllers/images_controller.rb index ab28dd07..42bbe08f 100644 --- a/app/controllers/images_controller.rb +++ b/app/controllers/images_controller.rb @@ -2,6 +2,8 @@ class ImagesController < ApplicationController HOST_URL = 'https://image.tmdb.org/t/p' + + skip_before_action :mkv_config, :movie_db_config, :plex_config def show send_data image, disposition: :inline, type: "image/#{params[:format]}" end diff --git a/app/controllers/movies_controller.rb b/app/controllers/movies_controller.rb index a9776182..ebf68cbc 100644 --- a/app/controllers/movies_controller.rb +++ b/app/controllers/movies_controller.rb @@ -3,10 +3,11 @@ class MoviesController < ApplicationController # movie GET /movies/:id(.:format) def show + scope = Movie.includes(:video_blobs) @movie = if params[:id].start_with?('tmdb:') - Movie.find_or_initialize_by(the_movie_db_id: params[:id].split(':').last) + scope.find_or_initialize_by(the_movie_db_id: params[:id].split(':').last) else - Movie.find(params[:id]) + scope.find(params[:id]) end @movie.subscribe(TheMovieDb::VideoListener.new) @movie.save! @@ -16,7 +17,7 @@ def show # rip_movie POST /movies/:id/rip(.:format) def rip movie = Movie.find(params[:id]) - disk = Disk.find(params[:disk_id]) + disk = Disk.includes(disk_titles: %i[episode episode_last]).find(params[:disk_id]) disk_titles = rip_disk_titles(disk, movie) job = RipWorker.perform_async( disk_id: disk.id, diff --git a/app/controllers/seasons_controller.rb b/app/controllers/seasons_controller.rb index 6427d168..0f1cd966 100644 --- a/app/controllers/seasons_controller.rb +++ b/app/controllers/seasons_controller.rb @@ -2,11 +2,16 @@ class SeasonsController < ApplicationController def show - @tv = Tv.find(params[:tv_id]) - @season = @tv.seasons.includes(episodes: [:ripped_disk_titles]).find(params[:id]) + @tv = Tv.includes(:ripped_disk_titles).find(params[:tv_id]) + @season = @tv.seasons.includes( + :ripped_disk_titles, + episodes: [ + { uploaded_video_blobs: %i[episode episode_last] }, { ripped_disk_titles: %i[episode_last] }, :video_blobs + ] + ).find(params[:id]) @season.subscribe(TheMovieDb::EpisodesListener.new) @season.save! - @disks = Disk.not_ejected + @disks = Disk.not_ejected.includes(:disk_titles) end def rip @@ -17,11 +22,11 @@ def rip private def season - @season ||= tv.seasons.find(params[:id]) + @season ||= tv.seasons.includes(:episodes).find(params[:id]) end def disk - @disk ||= Disk.find(params[:disk_id]) + @disk ||= Disk.includes(disk_titles: [:episode]).find(params[:disk_id]) end def tv diff --git a/app/controllers/tvs_controller.rb b/app/controllers/tvs_controller.rb index 575cb4aa..f776d840 100644 --- a/app/controllers/tvs_controller.rb +++ b/app/controllers/tvs_controller.rb @@ -2,10 +2,11 @@ class TvsController < ApplicationController def show + scope = Tv.includes(:seasons) @tv = if params[:id].start_with?('tmdb:') - Tv.find_or_initialize_by(the_movie_db_id: params[:id].split(':').last) + scope.find_or_initialize_by(the_movie_db_id: params[:id].split(':').last) else - Tv.find(params[:id]) + scope.find(params[:id]) end @tv.subscribe(TheMovieDb::VideoListener.new) @tv.save! diff --git a/app/listeners/disk_listener.rb b/app/listeners/disk_listener.rb index 6c5d970b..5c5349a6 100644 --- a/app/listeners/disk_listener.rb +++ b/app/listeners/disk_listener.rb @@ -32,7 +32,7 @@ def disk_loading(_) end def disk_loaded(disk) - return reload_page! unless (video = Video.auto_start.first) + return reload_page! unless (video = Video.auto_start.includes(:ripped_disk_titles).first) info_disk_titles = rip_disk_titles(disk, video) return reload_page! if info_disk_titles.empty? diff --git a/app/models/disk.rb b/app/models/disk.rb index ba99414d..971c0e86 100644 --- a/app/models/disk.rb +++ b/app/models/disk.rb @@ -25,7 +25,7 @@ class Disk < ApplicationRecord after_commit { broadcast(:disk_updated, self) } - has_many :disk_titles, dependent: :nullify, autosave: true + has_many :disk_titles, -> { order(:segment_map) }, dependent: :nullify, autosave: true, inverse_of: :disk has_many :not_ripped_disk_titles, -> { not_ripped }, dependent: false, inverse_of: :disk, class_name: 'DiskTitle' belongs_to :video, optional: true belongs_to :episode, optional: true diff --git a/app/models/disk_title.rb b/app/models/disk_title.rb index 34dafb43..d21e4925 100644 --- a/app/models/disk_title.rb +++ b/app/models/disk_title.rb @@ -11,6 +11,7 @@ # filename :string not null # name :string # ripped_at :datetime +# segment_map :string # size :integer default(0), not null # created_at :datetime not null # updated_at :datetime not null @@ -34,6 +35,8 @@ class DiskTitle < ApplicationRecord include ActionView::Helpers::DateHelper + serialize :segment_map, coder: JSON, type: Array + belongs_to :video, optional: true belongs_to :episode, optional: true belongs_to :episode_last, optional: true, class_name: 'Episode' @@ -43,13 +46,19 @@ class DiskTitle < ApplicationRecord scope :not_ripped, -> { where(ripped_at: nil) } scope :ripped, -> { where.not(ripped_at: nil) } + scope :sort_by_segment_map, -> { order(:segment_map) } validates :filename, presence: true before_save :set_episode_last def to_label - "##{title_id} #{name || filename} #{distance_of_time_in_words(duration)}" + [ + "##{title_id}", + filename || name, + distance_of_time_in_words(duration, 0, include_seconds: false, scope: 'datetime.distance_in_words.short'), + segment_map.join(', ') + ].compact_blank.join(' ') end def episode_numbers diff --git a/app/models/job.rb b/app/models/job.rb index de90c0e0..2220ef4c 100644 --- a/app/models/job.rb +++ b/app/models/job.rb @@ -82,8 +82,12 @@ def perform nil end + def name_constant + @name_constant ||= name.constantize + end + def worker - @worker ||= name.constantize.new(**arguments.symbolize_keys, job: self) + @worker ||= name_constant.new(**arguments.symbolize_keys, job: self) rescue StandardError => e record_exception!(e) broadcast_page_reload! diff --git a/app/models/video_blob.rb b/app/models/video_blob.rb index fe30a44b..ca6df999 100644 --- a/app/models/video_blob.rb +++ b/app/models/video_blob.rb @@ -105,7 +105,7 @@ def title def episode_numbers return if episode.nil? - episode.episode_number..(episode_last&.episode_number || episode.episode_number) + @episode_numbers ||= episode.episode_number..(episode_last&.episode_number || episode.episode_number) end def uploaded? diff --git a/app/services/create_disks_service.rb b/app/services/create_disks_service.rb index 57c1e553..02ac1220 100644 --- a/app/services/create_disks_service.rb +++ b/app/services/create_disks_service.rb @@ -3,7 +3,7 @@ class CreateDisksService < ApplicationService include Shell - option :job, Types.Instance(Job) + option :listener, optional: true def call return [] if (drives = list_drives).empty? @@ -14,7 +14,7 @@ def call private def create_or_update_disks(drive) - find_or_initalize_disk(drive).tap do |disk| + find_or_initialize_disk(drive).tap do |disk| disk.update!(loading: true) broadcast(:disk_loading, disk) disk.disk_titles.each(&:mark_for_destruction) @@ -29,17 +29,20 @@ def create_or_update_disks(drive) def find_or_build_disk_titles(disk) disk_info(disk).each do |info| disk_title = find_or_build_disk_title(disk, info) + Rails.logger.debug { "found #{disk_title.to_label} " } disk_title.unmark_for_destruction end end - def find_or_initalize_disk(drive) - Disk.find_or_initialize_by(name: drive.drive_name, disk_name: drive.disc_name) + def find_or_initialize_disk(drive) + Disk.find_or_initialize_by(name: drive.drive_name, disk_name: drive.disc_name).tap do |disk| + disk.disk_titles.load + end end def disk_info(disk) service = DiskInfoService.new(disk_name: disk.disk_name) - service.subscribe(MkvDiskLoadListener.new(job:)) + service.subscribe(listener) if listener service.call end @@ -50,7 +53,9 @@ def find_or_build_disk_title(disk, title) title.filename == disk_title.filename && title.bytes == disk_title.size && title.duration_seconds == disk_title.duration && - title.angle == disk_title.angle + title.angle == disk_title.angle && + title.description == disk_title.description && + title.segment_map == disk_title.segment_map end || disk.disk_titles.build( title_id: title.id, name: title.name, @@ -58,7 +63,8 @@ def find_or_build_disk_title(disk, title) size: title.bytes, duration: title.duration_seconds, angle: title.angle, - description: title.description + description: title.description, + segment_map: title.segment_map ) end end diff --git a/app/services/create_mkv_service.rb b/app/services/create_mkv_service.rb index ce69000a..5dd1b198 100644 --- a/app/services/create_mkv_service.rb +++ b/app/services/create_mkv_service.rb @@ -60,8 +60,9 @@ def tmp_dir end def video_blob + scope = VideoBlob.includes(:video, :episode_last, episode: { season: :tv }) @video_blob ||= if extra_type == :feature_films || extra_type.blank? || edition.present? - VideoBlob.find_or_create_by!( + scope.find_or_create_by!( video: disk_title.video, episode: disk_title.episode, episode_last: disk_title.episode_last, @@ -70,7 +71,7 @@ def video_blob part: ) else - VideoBlob.create!( + scope.create!( video: disk_title.video, episode: disk_title.episode, episode_last: disk_title.episode_last, diff --git a/app/services/disk_info_service.rb b/app/services/disk_info_service.rb index dd2f9593..1defbf3c 100644 --- a/app/services/disk_info_service.rb +++ b/app/services/disk_info_service.rb @@ -9,6 +9,10 @@ class TitleInfo param :id, Types::Coercible::Integer + def segment_map + Array.wrap(@segment_map&.split(',')&.map(&:to_i)) + end + def duration_seconds hours, minutes, seconds = duration.split(':') seconds.to_i + (minutes.to_i * 60) + (hours.to_i * 60 * 60) diff --git a/app/services/episode_disk_title_selector_service.rb b/app/services/episode_disk_title_selector_service.rb index 174f0639..c6d1479f 100644 --- a/app/services/episode_disk_title_selector_service.rb +++ b/app/services/episode_disk_title_selector_service.rb @@ -28,11 +28,15 @@ def episode_disk_title(episode) return if disk.nil? return if uploaded?(episode) || ripped?(episode) - disk.disk_titles.find { within_range?(episode, _1) && selected_disk_titles.exclude?(_1) }.tap do |disk_title| + disk_titles.find { within_range?(episode, _1) && selected_disk_titles.exclude?(_1) }.tap do |disk_title| selected_disk_titles.append(disk_title) if disk_title end end + def disk_titles + disk.disk_titles.sort_by { _1.segment_map.sum } + end + def select_episode(disk_title) episode = sort_episodes.find { selected_episodes.exclude?(_1) && within_range?(_1, disk_title) } selected_episodes.append(episode) @@ -42,7 +46,7 @@ def select_episode(disk_title) def uploaded?(episode) uploaded_episodes_numbers.any? { _1.include?(episode.episode_number) } || episode.ripped_disk_titles.any? { _1&.video_blob&.uploaded? } || - episode.video_blobs.any?(&:uploaded?) + episode.uploaded_video_blobs.any? end def ripped?(episode) diff --git a/app/tool_box/mkv_parser.rb b/app/tool_box/mkv_parser.rb index 29593afd..23965b0a 100644 --- a/app/tool_box/mkv_parser.rb +++ b/app/tool_box/mkv_parser.rb @@ -10,7 +10,7 @@ module MkvParser 15 => :angle, 16 => :source_file_name, 25 => :segment_count, - 26 => :segement_map, + 26 => :segment_map, 27 => :filename, 28 => :lang, 29 => :language, diff --git a/app/views/seasons/show.html.erb b/app/views/seasons/show.html.erb index 1dc3354b..8dc77165 100644 --- a/app/views/seasons/show.html.erb +++ b/app/views/seasons/show.html.erb @@ -23,9 +23,9 @@ Title Select <% if @tv.duration_stats.weighted_average %> - (<%= distance_of_time_in_words(@tv.duration_range.min, 0, include_seconds: params[:debug] == 'true') %> + (<%= distance_of_time_in_words(@tv.duration_range.min, 0, include_seconds: params[:debug] == 'true', scope: "datetime.distance_in_words.short") %> .. - <%= distance_of_time_in_words(@tv.duration_range.max, 0, include_seconds: params[:debug] == 'true') %>) + <%= distance_of_time_in_words(@tv.duration_range.max, 0, include_seconds: params[:debug] == 'true', scope: "datetime.distance_in_words.short") %>) <% if params[:debug] == 'true' %>
@@ -92,14 +92,14 @@ <% if @disks.any? %> <%= select_tag 'episodes[][disk_title_id]', - options_from_collection_for_select(@disks.first.disk_titles, :id, :to_label, title.disk_title&.id), prompt: "", data: { manage_selections_target: :input }, class: 'form-select mb-2' %> + options_from_collection_for_select(@disks.first.disk_titles.sort_by { _1.segment_map.sum }, :id, :to_label, title.disk_title&.id), prompt: "", data: { manage_selections_target: :input }, class: 'form-select mb-2' %> <% end %> <% if title.episode.duration_stats.weighted_average %> - <%= distance_of_time_in_words(title.episode.duration_stats.weighted_average) %> + <%= distance_of_time_in_words(title.episode.duration_stats.weighted_average, 0) %> <% elsif @season.duration_stats.weighted_average%> - <%= distance_of_time_in_words(@tv.duration_stats.weighted_average) %> + <%= distance_of_time_in_words(@tv.duration_stats.weighted_average, 0) %> <% elsif title.episode.runtime %> - <%= distance_of_time_in_words(title.episode.runtime) %> + <%= distance_of_time_in_words(title.episode.runtime, 0) %> <% end %> diff --git a/app/workers/application_worker.rb b/app/workers/application_worker.rb index c60d9173..6ee60fa8 100644 --- a/app/workers/application_worker.rb +++ b/app/workers/application_worker.rb @@ -16,7 +16,7 @@ def perform_async(**args) found_job.save! Rails.logger.info("#{found_job.worker.class}.perform_async(#{found_job.id})") - semaphore.synchronize { enqueued_jobs.add(found_job.id) } + Backgrounder.add_job_id(found_job.id) found_job end @@ -25,32 +25,8 @@ def find_or_create_job(**args) Job.sort_by_created_at.active.find_or_initialize_by(name: to_s, arguments: args) end - def process_work - id = take_job_id - return if id.nil? - - Job.find_by(id:)&.perform - end - - # Stores the thread that this current worker is running - def threads - @@threads ||= {} # rubocop:disable Style/ClassVars - end - - def enqueued_jobs - @@enqueued_jobs ||= Job.enqueued.pluck(:id).to_set # rubocop:disable Style/ClassVars - end - - def take_job_id - semaphore.synchronize do - job_id = enqueued_jobs.first - enqueued_jobs.delete(job_id) if job_id.present? - job_id - end - end - - def semaphore - @@semaphore ||= Thread::Mutex.new # rubocop:disable Style/ClassVars + def concurrently + nil end end diff --git a/app/workers/load_disk_worker.rb b/app/workers/load_disk_worker.rb index b4650e13..f1dac26c 100644 --- a/app/workers/load_disk_worker.rb +++ b/app/workers/load_disk_worker.rb @@ -9,12 +9,14 @@ def enqueue? def perform Disk.where .not(id: Disk.verified_disks.select(:id)) + .includes(:disk_titles) .in_batches .destroy_all DiskTitle.not_ripped.in_batches.destroy_all - CreateDisksService.new(job:).tap do |service| + mkv_disk_load_listener = MkvDiskLoadListener.new(job:) + CreateDisksService.new(listener: mkv_disk_load_listener).tap do |service| service.subscribe(DiskListener.new(job:)) - service.subscribe(MkvDiskLoadListener.new(job:)) + service.subscribe(mkv_disk_load_listener) end.call end end diff --git a/app/workers/rip_worker.rb b/app/workers/rip_worker.rb index 1826b591..ea0f5a9c 100644 --- a/app/workers/rip_worker.rb +++ b/app/workers/rip_worker.rb @@ -31,8 +31,10 @@ def perform def create_mkvs disk_titles.filter_map do |disk_title| + title = DiskTitle.includes(:video, :episode, :episode_last, :disk) + .find(disk_title[:id]) service = CreateMkvService.new( - disk_title: DiskTitle.find(disk_title[:id]), + disk_title: title, extra_type: disk_title[:extra_type], edition: disk_title[:edition], part: disk_title[:part] diff --git a/app/workers/upload_worker.rb b/app/workers/upload_worker.rb index 90680049..3b316f67 100644 --- a/app/workers/upload_worker.rb +++ b/app/workers/upload_worker.rb @@ -7,8 +7,13 @@ def enqueue? true end + # Max Number of this job that can run at the same time + def concurrently + 2 + end + def perform - video_blob = VideoBlob.find(video_blob_id) + video_blob = VideoBlob.includes(:video, :episode_last, episode: { season: :tv }).find(video_blob_id) return unless video_blob.uploadable? service = Ftp::UploadMkvService.new(video_blob:) diff --git a/config/environments/development.rb b/config/environments/development.rb index 3a1bc73b..6b441646 100644 --- a/config/environments/development.rb +++ b/config/environments/development.rb @@ -61,4 +61,6 @@ # Use an evented file watcher to asynchronously detect changes in source code, # routes, locales, etc. This feature depends on the listen gem. config.file_watcher = ActiveSupport::EventedFileUpdateChecker + # Strict Load model data to make sure things are fast + config.active_record.strict_loading_by_default = true end diff --git a/config/environments/test.rb b/config/environments/test.rb index 2256f8c9..c06b5af7 100644 --- a/config/environments/test.rb +++ b/config/environments/test.rb @@ -48,4 +48,6 @@ # Raises error for missing translations. # config.action_view.raise_on_missing_translations = true + # Strict Load model data to make sure things are fast + config.active_record.strict_loading_by_default = true end diff --git a/config/initializers/backgrounder.rb b/config/initializers/backgrounder.rb index 20a29f47..1f94aa8a 100644 --- a/config/initializers/backgrounder.rb +++ b/config/initializers/backgrounder.rb @@ -6,7 +6,37 @@ # class Backgrounder - TOTAL_WORKERS = 3 + class Manager + def thread(&block) + @thread ||= Thread.new do + loop do + block.call(self) + sleep 0.1 + rescue StandardError => e + Rails.logger.error e.message + Rails.logger.error e.backtrace.join("\n") + nil + ensure + self.current_job = nil + end + end + end + + def stop + Thread.stop(@thread) + end + + def run + @thread.run + end + + def kill + Thread.kill(@thread) + end + + attr_accessor :current_job + end + TOTAL_MANAGERS = 5 CRON_TASKS = { 'ContinueUploadWorker' => 60.seconds.to_i, @@ -15,70 +45,111 @@ class Backgrounder 'CleanupJobWorker' => 1.hour.to_i }.freeze + @semaphore = Thread::Mutex.new + @enqueued_jobs = Set.new + class << self - # kick off the threads asynchronusly - # so that the main thread can continue + def managers + Array.wrap(@managers) + end + def start - Rails.logger.info "Starting background #{TOTAL_WORKERS} workers and scheduler" + Rails.logger.info "Starting background #{TOTAL_MANAGERS} managers + and scheduler" + fix_broken_jobs - @threads = (worker_threads + [scheduled_thread]) - @threads.each(&:run) + @enqueued_jobs = Job.enqueued.pluck(:id).to_set + + @managers = worker_managers + scheduler.run + @managers.each(&:run) end def shutdown - Rails.logger.info "Shutting down background #{TOTAL_WORKERS} workers and scheduler" + Rails.logger.info "Shutting down background #{TOTAL_MANAGERS} workers and scheduler" Timeout.timeout(5) do - Array.wrap(@threads).each { Thread.stop(_1) } + managers.each(&:stop) end rescue StandardError => e Rails.logger.warn "Warning: #{e.message}" - Array.wrap(@threads).each { Thread.kill(_1) } + managers.each(&:kill) + end + + def add_job_id(id) + semaphore.synchronize do + Rails.logger.debug { "#add_job_id(#{id}) enqueued_jobs: #{enqueued_jobs}" } + enqueued_jobs.add(id) + end end private def fix_broken_jobs Job.hanging.find_each do |job| - next if ApplicationWorker.threads[job.id].present? + next if managers.any? { _1.current_job&.id == job.id } job.update!(status: :errored, error_message: 'Job was marked as stopping but no worker was found to process it') end end - def schedule - @schedule ||= CRON_TASKS.to_h do |task, _| - [task, Time.current.to_i] + def worker_managers + TOTAL_MANAGERS.times.map do + manager = Manager.new + manager.thread { process_work(_1) } + manager end end - def worker_threads - TOTAL_WORKERS.times.map do - threaded_loop { ApplicationWorker.process_work } + def scheduler + Thread.new do + schedule = CRON_TASKS.to_h { [_1, Time.current.to_i] } + + loop do + CRON_TASKS.each do |task, duration| + next if Time.current.to_i < schedule[task] + + Object.const_get(task).perform_async + schedule[task] = Time.current.to_i + duration + sleep 1 # make sure we don't unique all at the same time + rescue StandardError => e + Rails.logger.error e.message + Rails.logger.error e.backtrace.join("\n") + end + end end end - def scheduled_thread - threaded_loop do - CRON_TASKS.each do |task, duration| - next if Time.current.to_i < schedule[task] + def process_work(manager) + job_id = take_job_id + return if job_id.nil? - Object.const_get(task).perform_async - schedule[task] = Time.current.to_i + duration - sleep 1 # make sure we don't unique all at the same time - end + manager.current_job = Job.find(job_id) + + if manager.current_job.name_constant.concurrently.nil? || + manager.current_job.name_constant.concurrently >= concurrent_count + Rails.logger.debug { "#process_work #{manager.current_job.id}" } + manager.current_job.perform + else + add_job_id(manager.current_job.id) end end - def threaded_loop(&block) - Thread.new do - loop do - block.call - sleep 1 - rescue StandardError => e - Rails.logger.error e.message - Rails.logger.error e.backtrace.join("\n") - nil + attr_reader :enqueued_jobs, :semaphore + + def concurrent_count + managers.count do |manager| + manager.current_job&.name == @manager.current_job.name + end + end + + def take_job_id + semaphore.synchronize do + job_id = enqueued_jobs.first + if job_id.present? + enqueued_jobs.delete(job_id) + Rails.logger.debug { "#take_job_id #{job_id || 'nil'} enqueued_jobs: #{enqueued_jobs}" } end + job_id end end end diff --git a/config/locales/en.yml b/config/locales/en.yml index 7f642961..064b1ebe 100644 --- a/config/locales/en.yml +++ b/config/locales/en.yml @@ -32,6 +32,56 @@ en: time: formats: - default: "%Y-%m-%d %H:%M:%S" - short: "%d %b %H:%M" - long: "%B %d, %Y %H:%M" + default: '%Y-%m-%d %H:%M:%S' + short: '%d %b %H:%M' + long: '%B %d, %Y %H:%M' + + datetime: + distance_in_words: + short: + hours: + one: '%{count} hr' + other: '%{count} hrs' + minutes: + one: '%{count} min' + other: '%{count} mins' + seconds: + one: '%{count} sec' + other: '%{count} secs' + about_x_hours: + one: ~%{count} hr + other: ~%{count} hrs + about_x_months: + one: ~%{count} mo + other: ~%{count} mos + about_x_years: + one: ~%{count} yr + other: ~%{count} yrs + almost_x_years: + one: ~%{count} yr + other: ~%{count} yrs + half_a_minute: ~30 sec + less_than_x_seconds: + one: < %{count} sec + other: < %{count} secs + less_than_x_minutes: + one: < 1 min + other: < %{count} mins + over_x_years: + one: '> %{count} yr' + other: '> %{count} yrs' + x_seconds: + one: '%{count} sec' + other: '%{count} secs' + x_minutes: + one: '%{count} min' + other: '%{count} mins' + x_days: + one: '%{count} day' + other: '%{count} days' + x_months: + one: '%{count} mo' + other: '%{count} mos' + x_years: + one: '%{count} yr' + other: '%{count} yrs' diff --git a/current_version.txt b/current_version.txt index ec7fd1aa..858aaccc 100644 --- a/current_version.txt +++ b/current_version.txt @@ -1 +1 @@ -v5.14.0 +v5.15.0 diff --git a/db/migrate/20241123005122_add_segment_map.rb b/db/migrate/20241123005122_add_segment_map.rb new file mode 100644 index 00000000..e1a0713d --- /dev/null +++ b/db/migrate/20241123005122_add_segment_map.rb @@ -0,0 +1,5 @@ +class AddSegmentMap < ActiveRecord::Migration[7.2] + def change + add_column :disk_titles, :segment_map, :string + end +end diff --git a/db/schema.rb b/db/schema.rb index 9f473055..c3077493 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.2].define(version: 2024_10_30_174837) do +ActiveRecord::Schema[7.2].define(version: 2024_11_23_005122) do create_table "configs", force: :cascade do |t| t.string "type", default: "Config", null: false t.text "settings" @@ -35,6 +35,7 @@ t.string "filename", null: false t.string "description" t.integer "episode_last_id" + t.string "segment_map" t.index ["disk_id"], name: "index_disk_titles_on_disk_id" t.index ["episode_id"], name: "index_disk_titles_on_episode_id" t.index ["episode_last_id"], name: "index_disk_titles_on_episode_last_id" diff --git a/spec/factories/disk_titles.rb b/spec/factories/disk_titles.rb index ab3eddc5..0b15f0cd 100644 --- a/spec/factories/disk_titles.rb +++ b/spec/factories/disk_titles.rb @@ -11,6 +11,7 @@ # filename :string not null # name :string # ripped_at :datetime +# segment_map :string # size :integer default(0), not null # created_at :datetime not null # updated_at :datetime not null diff --git a/spec/models/disk_title_spec.rb b/spec/models/disk_title_spec.rb index eea7e626..0aeaaae5 100644 --- a/spec/models/disk_title_spec.rb +++ b/spec/models/disk_title_spec.rb @@ -11,6 +11,7 @@ # filename :string not null # name :string # ripped_at :datetime +# segment_map :string # size :integer default(0), not null # created_at :datetime not null # updated_at :datetime not null @@ -55,7 +56,7 @@ let(:disk_title) { create(:disk_title, title_id: 1, name: 'Sample Title', duration: 3600) } it 'returns the correct label' do - expect(disk_title.to_label).to eq('#1 Sample Title 1 hour') + expect(disk_title.to_label).to eq('#1 title_mk1.mkv 1 hr') end end diff --git a/spec/services/create_mkv_service_spec.rb b/spec/services/create_mkv_service_spec.rb index ea194b44..d2f16ee9 100644 --- a/spec/services/create_mkv_service_spec.rb +++ b/spec/services/create_mkv_service_spec.rb @@ -49,7 +49,7 @@ it 'build a video blob with all the attributes on the disk title' do expect { call }.to change(VideoBlob, :count).by(1) expect(VideoBlob.first.part).to eq(1) - expect(VideoBlob.first.episode_last).to eq(episode) + expect(VideoBlob.includes(:episode_last).first.episode_last).to eq(episode) end end @@ -67,7 +67,7 @@ it 'build a video blob with all the attributes on the disk title' do expect { call }.to change(VideoBlob, :count).by(1) expect(VideoBlob.first.part).to eq(1) - expect(VideoBlob.first.episode_last).to eq(episode_last) + expect(VideoBlob.includes(:episode_last).first.episode_last).to eq(episode_last) end end end diff --git a/spec/services/disk_info_service_spec.rb b/spec/services/disk_info_service_spec.rb index 43c10f8d..afb885ed 100644 --- a/spec/services/disk_info_service_spec.rb +++ b/spec/services/disk_info_service_spec.rb @@ -21,7 +21,7 @@ 'size' => '1.8 GB', 'bytes' => 1_947_029_504, 'segment_count' => 1, - 'segement_map' => '1-10', + 'segment_map' => '1-10', 'filename' => 'title_t00.mkv', 'description' => '10 chapter(s), 1.8 GB' }, @@ -32,7 +32,7 @@ 'size' => '2.1 GB', 'bytes' => 2_321_354_752, 'segment_count' => 1, - 'segement_map' => '1-11', + 'segment_map' => '1-11', 'filename' => 'title_t01.mkv', 'description' => '11 chapter(s), 2.1 GB' }, @@ -43,7 +43,7 @@ 'size' => '1.6 GB', 'bytes' => 1_810_173_952, 'segment_count' => 1, - 'segement_map' => '1-10', + 'segment_map' => '1-10', 'filename' => 'title_t02.mkv', 'description' => '10 chapter(s), 1.6 GB' }, @@ -54,7 +54,7 @@ 'size' => '1.7 GB', 'bytes' => 1_870_516_224, 'segment_count' => 1, - 'segement_map' => '1-10', + 'segment_map' => '1-10', 'filename' => 'title_t03.mkv', 'description' => '10 chapter(s), 1.7 GB' } diff --git a/spec/services/episode_disk_title_selector_service_spec.rb b/spec/services/episode_disk_title_selector_service_spec.rb index 385a83a6..98a1ab22 100644 --- a/spec/services/episode_disk_title_selector_service_spec.rb +++ b/spec/services/episode_disk_title_selector_service_spec.rb @@ -6,6 +6,15 @@ describe '#call' do subject(:call) { described_class.new(episodes:, disk:).call } + before do + episodes.each do |episode| + episode.association(:uploaded_video_blobs).loaded! + episode.association(:ripped_disk_titles).loaded! + end + tv.association(:episodes).target = episodes + tv.association(:episodes).loaded! + end + context 'when there is a single disk_title' do let(:episodes) { build_stubbed_list(:episode, 3, season:, runtime: 10.minutes) } let(:season) { build_stubbed(:season, tv:) } diff --git a/spec/services/movie_disk_title_selector_service_spec.rb b/spec/services/movie_disk_title_selector_service_spec.rb index 39acb8ca..87c58988 100644 --- a/spec/services/movie_disk_title_selector_service_spec.rb +++ b/spec/services/movie_disk_title_selector_service_spec.rb @@ -6,6 +6,8 @@ describe '#call' do subject(:call) { described_class.new(movie:, disk:).call } + before { movie.association(:ripped_disk_titles).loaded! } + let(:movie) { build_stubbed(:movie, movie_runtime: 10.seconds) } let(:disk) { build_stubbed(:disk, disk_titles: [disk_title]) } let(:disk_title) { build_stubbed(:disk_title, duration: 10.seconds) }