Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quiet job continuation #1133

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
34 changes: 28 additions & 6 deletions app/jobs/publish_feed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ def publish_apple(podcast, feed)
res = PublishAppleJob.do_perform(podcast.apple_config)
PublishingPipelineState.publish_apple!(podcast)
res
rescue Apple::AssetStateTimeoutError => e
# Not strictly a 'fail state' because we want to retry this job
PublishingPipelineState.error_apple!(podcast)
svevang marked this conversation as resolved.
Show resolved Hide resolved
Rails.logger.send(e.log_level, e.message, {podcast_id: podcast.id})
svevang marked this conversation as resolved.
Show resolved Hide resolved
NewRelic::Agent.notice_error(e)
raise e if podcast.apple_config.sync_blocks_rss
rescue => e
if podcast.apple_config.sync_blocks_rss
fail_state(podcast, "apple", e)
Expand All @@ -49,16 +55,32 @@ def publish_rss(podcast, feed)
fail_state(podcast, "rss", e)
end

def fail_state(podcast, type, error)
(pipeline_method, log_level) = case type
when "apple" then [:error_apple!, :warn]
when "rss" then [:error_rss!, :warn]
when "apple_timeout", "error" then [:error!, :error]
def apple_timeout_log_level(error)
error.try(:log_level) || :error
end

def should_raise?(error)
if error.respond_to?(:raise_publishing_error?)
error.raise_publishing_error?
else
true
end
end

def fail_state(podcast, type, error)
(pipeline_method, log_level) =
case type
when "apple" then [:error_apple!, :warn]
when "rss" then [:error_rss!, :warn]
when "apple_timeout"
level = apple_timeout_log_level(error)
[:retry!, level]
when "error" then [:error!, :error]
end

PublishingPipelineState.public_send(pipeline_method, podcast)
Rails.logger.send(log_level, error.message, {podcast_id: podcast.id})
raise error
raise error if should_raise?(error)
end

def save_file(podcast, feed, options = {})
Expand Down
15 changes: 15 additions & 0 deletions app/models/apple/asset_state_timeout_error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,20 @@ def initialize(episodes)
def episode_ids
episodes.map(&:feeder_id)
end

def raise_publishing_error?
%i[error fatal].include?(log_level)
end

def log_level
case attempts
when 0..4
:warn
when 5
:error
else
:fatal
end
end
end
end
12 changes: 12 additions & 0 deletions app/models/apple/episode_delivery_status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@ module Apple
class EpisodeDeliveryStatus < ApplicationRecord
belongs_to :episode, -> { with_deleted }, class_name: "::Episode"

def self.measure_asset_processing_duration(apple_episode_delivery_statuses)
statuses = apple_episode_delivery_statuses.to_a

last_status = statuses.shift
return nil unless last_status&.asset_processing_attempts.to_i.positive?

start_status = statuses.find { |status| status.asset_processing_attempts.to_i.zero? }
return nil unless start_status

Time.now - start_status.created_at
end

def self.update_status(episode, attrs)
new_status = (episode.apple_episode_delivery_status&.dup || default_status(episode))
new_status.assign_attributes(attrs)
Expand Down
12 changes: 2 additions & 10 deletions app/models/concerns/apple_delivery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,15 @@ def apple_needs_delivery?
end

def apple_needs_delivery!
apple_update_delivery_status(delivered: false)
apple_update_delivery_status(delivered: false, asset_processing_attempts: 0)
end

def apple_has_delivery!
apple_update_delivery_status(delivered: true)
end

def measure_asset_processing_duration
statuses = apple_episode_delivery_statuses.to_a

last_status = statuses.shift
return nil unless last_status&.asset_processing_attempts.to_i.positive?

start_status = statuses.find { |status| status.asset_processing_attempts.to_i.zero? }
return nil unless start_status

Time.now - start_status.created_at
Apple::EpisodeDeliveryStatus.measure_asset_processing_duration(apple_episode_delivery_statuses)
end

def apple_prepare_for_delivery!
Expand Down
11 changes: 8 additions & 3 deletions app/models/publishing_pipeline_state.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
class PublishingPipelineState < ApplicationRecord
TERMINAL_STATUSES = [:complete, :error, :expired].freeze
TERMINAL_FAILURE_STATUSES = [:error, :expired].freeze
TERMINAL_STATUSES = [:complete, :error, :expired, :retry].freeze
TERMINAL_FAILURE_STATUSES = [:error, :expired, :retry].freeze
UNIQUE_STATUSES = TERMINAL_STATUSES + [:created, :started]

# Handle the max timout for a publishing pipeline: Pub RSS job + Pub Apple job + a few extra minutes of flight
Expand Down Expand Up @@ -53,7 +53,8 @@ class PublishingPipelineState < ApplicationRecord
:error,
:expired,
:error_apple,
:error_rss
:error_rss,
:retry
]

validate :podcast_ids_match
Expand Down Expand Up @@ -170,6 +171,10 @@ def self.expire!(podcast)
state_transition(podcast, :expired)
end

def self.retry!(podcast)
state_transition(podcast, :retry)
end

def self.expire_pipelines!
Podcast.with_deleted.where(id: expired_pipelines.select(:podcast_id)).each do |podcast|
Rails.logger.tagged("PublishingPipeLineState.expire_pipelines!", "Podcast:#{podcast.id}") do
Expand Down
79 changes: 62 additions & 17 deletions test/jobs/publish_feed_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,16 @@
end

describe "publishing to apple" do
let(:podcast) { create(:podcast) }
let(:public_feed) { podcast.default_feed }
let(:private_feed) { create(:apple_feed, podcast: podcast) }
let(:apple_feed) { private_feed }
let(:apple_config) { podcast.apple_config }
let(:apple_config) { private_feed.apple_config }
let(:apple_publisher) { apple_config.build_publisher }

before do
assert private_feed.persisted?
assert podcast.reload.apple_config.present?
assert apple_config.persisted?
end

describe "#perform" do
Expand Down Expand Up @@ -151,30 +155,55 @@
PublishingQueueItem.create!(podcast: feed.podcast)
end

it "raises an error if the apple publishing fails" do
let(:episode1) { build(:uploaded_apple_episode, show: apple_publisher.show) }
let(:episode2) { build(:uploaded_apple_episode, show: apple_publisher.show) }
let(:episodes) { [episode1, episode2] }

it "logs message if the apple publishing times out" do
assert apple_feed.apple_config.present?
assert apple_feed.apple_config.publish_enabled

PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do
assert_raises(RuntimeError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) }

assert_equal ["created", "started", "error", "error_apple"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
expected_level_for_timeouts = [
[0, 40],
[1, 40],
[2, 40],
[3, 40],
[4, 40],
[5, 50],
[6, 60]
]

expected_level_for_timeouts.each do |(attempts, level)|
# simulate a episode waiting n times
episodes.first.apple_episode_delivery_status.update(asset_processing_attempts: attempts)

PublishFeedJob.stub(:s3_client, stub_client) do
PublishAppleJob.stub(:do_perform, ->(*, **) { raise Apple::AssetStateTimeoutError.new(episodes) }) do
lines = capture_json_logs do
PublishingQueueItem.ensure_queued!(feed.podcast)
PublishingPipelineState.attempt!(feed.podcast, perform_later: false)
rescue
nil
end

log = lines.find { |l| l["msg"].include?("Timeout waiting for asset state change") }
assert log.present?
assert_equal level, log["level"]

assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.order(:id).pluck(:status)
end
end
end
end

it "does not raise an error if the apple publishing fails and apple sync does not block rss publishing" do
it "raises an error if the apple publishing times out" do
assert apple_feed.apple_config.present?
assert apple_feed.apple_config.publish_enabled
apple_feed.apple_config.update!(sync_blocks_rss: false)
feed.reload

PublishFeedJob.stub(:s3_client, stub_client) do
PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do
# no error raised
PublishingPipelineState.attempt!(feed.podcast, perform_later: false)
PublishAppleJob.stub(:do_perform, ->(*, **) { raise Apple::AssetStateTimeoutError.new([]) }) do
assert_raises(Apple::AssetStateTimeoutError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) }

assert_equal ["created", "started", "error_apple", "published_rss", "published_rss", "published_rss", "complete"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
end
assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.order(id: :asc).pluck(:status)
end
end

Expand All @@ -185,7 +214,23 @@
PublishAppleJob.stub(:do_perform, ->(*, **) { raise Apple::AssetStateTimeoutError.new([]) }) do
assert_raises(Apple::AssetStateTimeoutError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) }

assert_equal ["created", "started", "error", "error_apple"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.order(id: :asc).pluck(:status)
end
end

it "does not raise an error if the apple publishing fails and apple sync does not block rss publishing" do
assert apple_feed.apple_config.present?
assert apple_feed.apple_config.publish_enabled
apple_feed.apple_config.update!(sync_blocks_rss: false)
feed.reload

PublishFeedJob.stub(:s3_client, stub_client) do
PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do
# no error raised
PublishingPipelineState.attempt!(feed.podcast, perform_later: false)

assert_equal ["created", "started", "error_apple", "published_rss", "published_rss", "published_rss", "complete"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
end
end
end
end
Expand Down
35 changes: 35 additions & 0 deletions test/models/publishing_pipeline_state_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,41 @@
end
end

describe "retry!" do
let(:podcast) { create(:podcast) }
let(:public_feed) { podcast.default_feed }
let(:private_feed) { create(:apple_feed, podcast: podcast) }
let(:apple_feed) { private_feed }
let(:apple_config) { private_feed.apple_config }
let(:apple_publisher) { apple_config.build_publisher }

it 'sets the status to "retry"' do
episode = build(:uploaded_apple_episode, show: apple_publisher.show)

# it does not trigger an exception
episode.apple_episode_delivery_status.update(asset_processing_attempts: 1)

pqi = nil
PublishFeedJob.stub_any_instance(:save_file, nil) do
PublishAppleJob.stub(:do_perform, ->(*args) { raise Apple::AssetStateTimeoutError.new([episode]) }) do
pqi = PublishingQueueItem.ensure_queued!(podcast)
PublishingPipelineState.attempt!(podcast, perform_later: false)
end
end

assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: podcast).order(:id).pluck(:status)
assert_equal "retry", pqi.reload.last_pipeline_state

# it retries
PublishingPipelineState.retry_failed_pipelines!
assert_equal ["created", "started", "error_apple", "retry", "created"], PublishingPipelineState.where(podcast: podcast).order(:id).pluck(:status)
res_pqi = PublishingQueueItem.current_unfinished_item(podcast)

assert res_pqi.id > pqi.id
assert_equal "created", res_pqi.last_pipeline_state
end
end

describe "complete!" do
it 'sets the status to "complete"' do
PublishFeedJob.stub_any_instance(:save_file, nil) do
Expand Down
Loading