Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

respect the state update counter if present #90

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lib/travis/hub/handler.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
require 'travis/hub/helper/context'
require 'travis/hub/helper/hash'
require 'travis/hub/helper/string'
require 'travis/hub/service'

module Travis
module Hub
class Handler
include Helper::Context, Helper::String
include Helper::Context, Helper::Hash, Helper::String

attr_reader :context, :type, :event, :payload, :object

Expand All @@ -31,6 +32,7 @@ def run

def handle
const = Service.const_get("Update#{camelize(type)}")
p [event, payload]
const.new(context, event, payload).run
end

Expand All @@ -45,7 +47,7 @@ def normalize_event(event)
end

def normalize_payload(payload)
payload = payload.symbolize_keys
payload = deep_symbolize_keys(payload)
payload = normalize_state(payload)
normalize_timestamps(payload)
end
Expand Down
21 changes: 21 additions & 0 deletions lib/travis/hub/helper/hash.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module Travis
module Hub
module Helper
module Hash
def deep_symbolize_keys(hash)
hash.map do |key, obj|
obj = case obj
when Array
obj.map { |obj| deep_symbolize_keys(obj) }
when ::Hash
deep_symbolize_keys(obj)
else
obj
end
[key.to_sym, obj]
end.to_h
end
end
end
end
end
106 changes: 106 additions & 0 deletions lib/travis/hub/service/state_update.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
module Travis
module Hub
module Service
class StateUpdate < Struct.new(:event, :data, :block)
class Counter < Struct.new(:job_id, :redis)
TTL = 3600 * 24

def count
@count ||= redis.get(key).to_i
end

def store(count)
redis.set(key, count)
redis.expire(key, TTL)
end

private

def key
"job:state_update_count:#{job_id}"
end
end

include Helper::Context

OUT_OF_BAND = [:cancel, :restart]

MSGS = {
missing: 'Received state update (%p) with no count for job id=%p, last known count: %p.',
ordered: 'Received state update %p (%p) for job id=%p, last known count: %p',
unordered: 'Received state update %p (%p) for job id=%p, last known count: %p. Skipping the message.',
}

def apply
if !enabled? || out_of_band?
call
elsif missing?
missing
elsif ordered?
ordered
else
unordered
end
end

private

def call
block.call
end

def enabled?
!!ENV['UPDATE_COUNT']
end

def out_of_band?
OUT_OF_BAND.include?(event)
end

def missing?
count.nil?
end

def missing
warn :missing, event, job_id, counter.count
call
store
end

def ordered
info :ordered, count, event, job_id, counter.count
call
store
end

def unordered
warn :unordered, count, event, job_id, counter.count
end

def ordered?
count >= counter.count
end

def store
counter.store(count)
end

def counter
@counter ||= Counter.new(job_id, redis)
end

def job_id
data[:id]
end

def count
meta[:state_update_count]
end

def meta
data[:meta] || {}
end
end
end
end
end
13 changes: 10 additions & 3 deletions lib/travis/hub/service/update_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'travis/hub/helper/locking'
require 'travis/hub/model/job'
require 'travis/hub/service/error_job'
require 'travis/hub/service/state_update'
require 'travis/hub/service/notify_workers'
require 'travis/hub/helper/limit'

Expand All @@ -16,14 +17,16 @@ class UpdateJob < Struct.new(:event, :data)
EVENTS = [:receive, :reset, :start, :finish, :cancel, :restart]

MSGS = {
skipped: 'Skipped event job:%s for <Job id=%s> trying to update state from %s to %s data=%s',
skipped: 'Skipped event job:%s for <Job id=%s> trying to update state from %p to %p data=%s',
}

def run
exclusive do
validate
update_job
notify
with_state_update do
update_job
notify
end
end
end
instrument :run
Expand Down Expand Up @@ -72,6 +75,10 @@ def skipped
warn :skipped, event, job.id, job.state, data[:state], data
end

def with_state_update(&block)
StateUpdate.new(context, event, data, block).apply
end

def resets
@resets ||= Limit.new(redis, :resets, job.id, config.limit.resets)
end
Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
Travis::Event.instance_variable_set(:@subscriptions, nil)
# Travis::Addons.setup({ host: 'host.com', encryption: { key: 'secret' * 10 } }, logger)
Time.stubs(:now).returns(NOW)
context.redis.flushall
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/support/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Context

included do
let(:stdout) { StringIO.new }
let(:log) { stdout.string }
let(:logger) { Travis::Logger.new(stdout) }
let(:context) { Travis::Hub::Context.new(logger: logger) }
before { Travis::Hub.context = context }
Expand Down
63 changes: 63 additions & 0 deletions spec/travis/hub/service/update_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,67 @@ def recieve(msg)
expect(job.reload.state).to eql :passed
end
end

describe 'state update count' do
let(:job) { FactoryGirl.create(:job, state: :received) }
let(:event) { :start }
let(:data) { { id: job.id, state: :started, meta: meta } }

before { ENV['UPDATE_COUNT'] = 'true' }
after { ENV['UPDATE_COUNT'] = nil }

describe 'with no count stored' do
describe 'given no meta' do
let(:meta) { nil }
before { subject.run }

it { expect(job.reload.state).to eq :started }
it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 0" }
end

describe 'given no count' do
let(:meta) { {} }
before { subject.run }

it { expect(job.reload.state).to eq :started }
it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 0" }
end

describe 'given a count' do
let(:meta) { { state_update_count: 2 } }
before { subject.run }

it { expect(job.reload.state).to eq :started }
it { expect(log).to include "I Received state update 2 (:start) for job id=#{job.id}, last known count: 0" }
end
end

describe 'with a count stored' do
before { context.redis.set("job:state_update_count:#{job.id}", 3) }

describe 'given no meta it skips the message' do
let(:meta) { nil }
before { subject.run }

it { expect(job.reload.state).to eq :started }
it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 3" }
end

describe 'given no count it skips the message' do
let(:meta) { {} }
before { subject.run }

it { expect(job.reload.state).to eq :started }
it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 3" }
end

describe 'given a count it skips the message' do
let(:meta) { { state_update_count: 2 } }
before { subject.run }

it { expect(job.reload.state).to eq :received }
it { expect(log).to include "W Received state update 2 (:start) for job id=#{job.id}, last known count: 3. Skipping the message." }
end
end
end
end