diff --git a/lib/rudder/analytics/client.rb b/lib/rudder/analytics/client.rb index b1cdf1e..272486e 100644 --- a/lib/rudder/analytics/client.rb +++ b/lib/rudder/analytics/client.rb @@ -24,6 +24,7 @@ class Client # @option opts [FixNum] :max_queue_size Maximum number of calls to be # remain queued. # @option opts [Proc] :on_error Handles error calls from the API. + # @option opts [Proc] :on_error_with_messages Handles error calls from the API, with failed messages. def initialize(opts = {}) @config = Configuration.new(opts) @queue = Queue.new diff --git a/lib/rudder/analytics/configuration.rb b/lib/rudder/analytics/configuration.rb index 8149a1f..0906657 100644 --- a/lib/rudder/analytics/configuration.rb +++ b/lib/rudder/analytics/configuration.rb @@ -7,7 +7,7 @@ class Analytics class Configuration include Rudder::Analytics::Utils - attr_reader :write_key, :data_plane_url, :on_error, :stub, :gzip, :ssl, :batch_size, :test, :max_queue_size, :backoff_policy, :retries + attr_reader :write_key, :data_plane_url, :on_error, :on_error_with_messages, :stub, :gzip, :ssl, :batch_size, :test, :max_queue_size, :backoff_policy, :retries def initialize(settings = {}) symbolized_settings = symbolize_keys(settings) @@ -18,6 +18,7 @@ def initialize(settings = {}) @max_queue_size = symbolized_settings[:max_queue_size] || Defaults::Queue::MAX_SIZE @ssl = symbolized_settings[:ssl] @on_error = symbolized_settings[:on_error] || proc { |status, error| } + @on_error_with_messages = symbolized_settings[:on_error_with_messages] || proc { |status, error, messages| } @stub = symbolized_settings[:stub] @batch_size = symbolized_settings[:batch_size] || Defaults::MessageBatch::MAX_SIZE @gzip = symbolized_settings[:gzip] diff --git a/lib/rudder/analytics/message_batch.rb b/lib/rudder/analytics/message_batch.rb index 60811c5..0d0366e 100644 --- a/lib/rudder/analytics/message_batch.rb +++ b/lib/rudder/analytics/message_batch.rb @@ -26,7 +26,7 @@ def <<(message) message_json = message.to_json # puts message_json rescue StandardError => e - raise JSONGenerationError, "Serialization error: #{e}" + raise JSONGenerationError, "Serialization error: #{e} for message: #{message.inspect}" end message_json_size = message_json.bytesize diff --git a/lib/rudder/analytics/version.rb b/lib/rudder/analytics/version.rb index a5e0056..f3f08c4 100644 --- a/lib/rudder/analytics/version.rb +++ b/lib/rudder/analytics/version.rb @@ -2,6 +2,6 @@ module Rudder class Analytics - VERSION = '3.0.1' + VERSION = '3.1.0' end end diff --git a/lib/rudder/analytics/worker.rb b/lib/rudder/analytics/worker.rb index ff18190..ae812cf 100644 --- a/lib/rudder/analytics/worker.rb +++ b/lib/rudder/analytics/worker.rb @@ -29,6 +29,7 @@ def initialize(queue, config) @write_key = config.write_key @ssl = config.ssl @on_error = config.on_error + @on_error_with_messages = config.on_error_with_messages @batch = MessageBatch.new(config.batch_size) @lock = Mutex.new @transport = Transport.new(config) @@ -46,7 +47,10 @@ def run # res = Request.new(:data_plane_url => @data_plane_url, :ssl => @ssl).post @write_key, @batch res = @transport.send @write_key, @batch - @on_error.call(res.status, res.error) unless res.status == 200 + unless res.status == 200 + @on_error.call(res.status, res.error) + @on_error_with_messages.call(res.status, res.error, @batch.messages) + end @lock.synchronize { @batch.clear } end @@ -66,6 +70,7 @@ def consume_message_from_queue! @batch << @queue.pop rescue MessageBatch::JSONGenerationError => e @on_error.call(-1, e.to_s) + @on_error_with_messages.call(-1, e.to_s, []) end end end diff --git a/spec/rudder/analytics/worker_spec.rb b/spec/rudder/analytics/worker_spec.rb index 544f7ee..e752b9a 100644 --- a/spec/rudder/analytics/worker_spec.rb +++ b/spec/rudder/analytics/worker_spec.rb @@ -79,6 +79,38 @@ class Analytics expect(error).to eq('Some error') end + it 'executes the on_error_with_messages error handler if the request is invalid' do + Rudder::Analytics::Transport + .any_instance + .stub(:send) + .and_return(Rudder::Analytics::Response.new(400, 'Some error')) + + status = error = data = nil + on_error_with_messages = proc do |yielded_status, yielded_error, yielded_data| + sleep 0.2 # Make this take longer than thread spin-up (below) + status, error, data = yielded_status, yielded_error, yielded_data.dup + end + + message = { context: {:ip=>"127.0.0.1"}, type: "identify", traits: { email: "test@test.com" } } + queue = Queue.new + queue << message + config = Configuration.new({ :on_error_with_messages => on_error_with_messages, :write_key => 'write_key', :data_plane_url => 'data_plane_url' }) + worker = described_class.new(queue, config) + + # This is to ensure that Client#flush doesn't finish before calling + # the error handler. + Thread.new { worker.run } + sleep 0.1 # First give thread time to spin-up. + sleep 0.01 while worker.is_requesting? + + Rudder::Analytics::Transport.any_instance.unstub(:send) + + expect(queue).to be_empty + expect(status).to eq(400) + expect(error).to eq('Some error') + expect(data).to eq([message]) + end + # it 'does not call on_error if the request is good' do # on_error = proc do |status, error| # puts "#{status}, #{error}"