From a707c74a098bdbbd3a5a717e79890e233230c04c Mon Sep 17 00:00:00 2001 From: Sven Fuchs Date: Fri, 18 Nov 2016 12:25:39 +0100 Subject: [PATCH] respect the state update counter if present --- lib/travis/hub/handler.rb | 6 +- lib/travis/hub/helper/hash.rb | 21 ++++ lib/travis/hub/service/state_update.rb | 106 +++++++++++++++++++++ lib/travis/hub/service/update_job.rb | 13 ++- spec/spec_helper.rb | 1 + spec/support/context.rb | 1 + spec/travis/hub/service/update_job_spec.rb | 63 ++++++++++++ 7 files changed, 206 insertions(+), 5 deletions(-) create mode 100644 lib/travis/hub/helper/hash.rb create mode 100644 lib/travis/hub/service/state_update.rb diff --git a/lib/travis/hub/handler.rb b/lib/travis/hub/handler.rb index f19855eb2..6321490d4 100644 --- a/lib/travis/hub/handler.rb +++ b/lib/travis/hub/handler.rb @@ -1,11 +1,12 @@ require 'travis/hub/helper/context' +require 'travis/hub/helper/hash' require 'travis/hub/helper/string' require 'travis/hub/service' module Travis module Hub class Handler - include Helper::Context, Helper::String + include Helper::Context, Helper::Hash, Helper::String attr_reader :context, :type, :event, :payload, :object @@ -31,6 +32,7 @@ def run def handle const = Service.const_get("Update#{camelize(type)}") + p [event, payload] const.new(context, event, payload).run end @@ -45,7 +47,7 @@ def normalize_event(event) end def normalize_payload(payload) - payload = payload.symbolize_keys + payload = deep_symbolize_keys(payload) payload = normalize_state(payload) normalize_timestamps(payload) end diff --git a/lib/travis/hub/helper/hash.rb b/lib/travis/hub/helper/hash.rb new file mode 100644 index 000000000..78d1507a3 --- /dev/null +++ b/lib/travis/hub/helper/hash.rb @@ -0,0 +1,21 @@ +module Travis + module Hub + module Helper + module Hash + def deep_symbolize_keys(hash) + hash.map do |key, obj| + obj = case obj + when Array + obj.map { |obj| deep_symbolize_keys(obj) } + when ::Hash + deep_symbolize_keys(obj) + else + obj + end + [key.to_sym, obj] + end.to_h + end + end + end + end +end diff --git a/lib/travis/hub/service/state_update.rb b/lib/travis/hub/service/state_update.rb new file mode 100644 index 000000000..cd4051429 --- /dev/null +++ b/lib/travis/hub/service/state_update.rb @@ -0,0 +1,106 @@ +module Travis + module Hub + module Service + class StateUpdate < Struct.new(:event, :data, :block) + class Counter < Struct.new(:job_id, :redis) + TTL = 3600 * 24 + + def count + @count ||= redis.get(key).to_i + end + + def store(count) + redis.set(key, count) + redis.expire(key, TTL) + end + + private + + def key + "job:state_update_count:#{job_id}" + end + end + + include Helper::Context + + OUT_OF_BAND = [:cancel, :restart] + + MSGS = { + missing: 'Received state update (%p) with no count for job id=%p, last known count: %p.', + ordered: 'Received state update %p (%p) for job id=%p, last known count: %p', + unordered: 'Received state update %p (%p) for job id=%p, last known count: %p. Skipping the message.', + } + + def apply + if !enabled? || out_of_band? + call + elsif missing? + missing + elsif ordered? + ordered + else + unordered + end + end + + private + + def call + block.call + end + + def enabled? + !!ENV['UPDATE_COUNT'] + end + + def out_of_band? + OUT_OF_BAND.include?(event) + end + + def missing? + count.nil? + end + + def missing + warn :missing, event, job_id, counter.count + call + store + end + + def ordered + info :ordered, count, event, job_id, counter.count + call + store + end + + def unordered + warn :unordered, count, event, job_id, counter.count + end + + def ordered? + count >= counter.count + end + + def store + counter.store(count) + end + + def counter + @counter ||= Counter.new(job_id, redis) + end + + def job_id + data[:id] + end + + def count + meta[:state_update_count] + end + + def meta + data[:meta] || {} + end + end + end + end +end diff --git a/lib/travis/hub/service/update_job.rb b/lib/travis/hub/service/update_job.rb index ffa10b541..278123096 100644 --- a/lib/travis/hub/service/update_job.rb +++ b/lib/travis/hub/service/update_job.rb @@ -3,6 +3,7 @@ require 'travis/hub/helper/locking' require 'travis/hub/model/job' require 'travis/hub/service/error_job' +require 'travis/hub/service/state_update' require 'travis/hub/service/notify_workers' require 'travis/hub/helper/limit' @@ -16,14 +17,16 @@ class UpdateJob < Struct.new(:event, :data) EVENTS = [:receive, :reset, :start, :finish, :cancel, :restart] MSGS = { - skipped: 'Skipped event job:%s for trying to update state from %s to %s data=%s', + skipped: 'Skipped event job:%s for trying to update state from %p to %p data=%s', } def run exclusive do validate - update_job - notify + with_state_update do + update_job + notify + end end end instrument :run @@ -72,6 +75,10 @@ def skipped warn :skipped, event, job.id, job.state, data[:state], data end + def with_state_update(&block) + StateUpdate.new(context, event, data, block).apply + end + def resets @resets ||= Limit.new(redis, :resets, job.id, config.limit.resets) end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index bb179003b..67c579252 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -75,6 +75,7 @@ Travis::Event.instance_variable_set(:@subscriptions, nil) # Travis::Addons.setup({ host: 'host.com', encryption: { key: 'secret' * 10 } }, logger) Time.stubs(:now).returns(NOW) + context.redis.flushall end end diff --git a/spec/support/context.rb b/spec/support/context.rb index 282ada88c..de10598cd 100644 --- a/spec/support/context.rb +++ b/spec/support/context.rb @@ -6,6 +6,7 @@ module Context included do let(:stdout) { StringIO.new } + let(:log) { stdout.string } let(:logger) { Travis::Logger.new(stdout) } let(:context) { Travis::Hub::Context.new(logger: logger) } before { Travis::Hub.context = context } diff --git a/spec/travis/hub/service/update_job_spec.rb b/spec/travis/hub/service/update_job_spec.rb index 446cdb7f6..0f0e77427 100644 --- a/spec/travis/hub/service/update_job_spec.rb +++ b/spec/travis/hub/service/update_job_spec.rb @@ -210,4 +210,67 @@ def recieve(msg) expect(job.reload.state).to eql :passed end end + + describe 'state update count' do + let(:job) { FactoryGirl.create(:job, state: :received) } + let(:event) { :start } + let(:data) { { id: job.id, state: :started, meta: meta } } + + before { ENV['UPDATE_COUNT'] = 'true' } + after { ENV['UPDATE_COUNT'] = nil } + + describe 'with no count stored' do + describe 'given no meta' do + let(:meta) { nil } + before { subject.run } + + it { expect(job.reload.state).to eq :started } + it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 0" } + end + + describe 'given no count' do + let(:meta) { {} } + before { subject.run } + + it { expect(job.reload.state).to eq :started } + it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 0" } + end + + describe 'given a count' do + let(:meta) { { state_update_count: 2 } } + before { subject.run } + + it { expect(job.reload.state).to eq :started } + it { expect(log).to include "I Received state update 2 (:start) for job id=#{job.id}, last known count: 0" } + end + end + + describe 'with a count stored' do + before { context.redis.set("job:state_update_count:#{job.id}", 3) } + + describe 'given no meta it skips the message' do + let(:meta) { nil } + before { subject.run } + + it { expect(job.reload.state).to eq :started } + it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 3" } + end + + describe 'given no count it skips the message' do + let(:meta) { {} } + before { subject.run } + + it { expect(job.reload.state).to eq :started } + it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 3" } + end + + describe 'given a count it skips the message' do + let(:meta) { { state_update_count: 2 } } + before { subject.run } + + it { expect(job.reload.state).to eq :received } + it { expect(log).to include "W Received state update 2 (:start) for job id=#{job.id}, last known count: 3. Skipping the message." } + end + end + end end