From 383ab3f091d843024ea7fe8c66b91cecf2b2bb00 Mon Sep 17 00:00:00 2001 From: Chunhui Teng Date: Thu, 21 Nov 2024 14:51:48 -0500 Subject: [PATCH] add an option (on_error_with_messages) to provide a callback option to get the data affected by the error --- lib/rudder/analytics/client.rb | 1 + lib/rudder/analytics/configuration.rb | 3 ++- lib/rudder/analytics/message_batch.rb | 2 +- lib/rudder/analytics/version.rb | 2 +- lib/rudder/analytics/worker.rb | 7 ++++++- 5 files changed, 11 insertions(+), 4 deletions(-) diff --git a/lib/rudder/analytics/client.rb b/lib/rudder/analytics/client.rb index b1cdf1e..272486e 100644 --- a/lib/rudder/analytics/client.rb +++ b/lib/rudder/analytics/client.rb @@ -24,6 +24,7 @@ class Client # @option opts [FixNum] :max_queue_size Maximum number of calls to be # remain queued. # @option opts [Proc] :on_error Handles error calls from the API. + # @option opts [Proc] :on_error_with_messages Handles error calls from the API, with failed messages. def initialize(opts = {}) @config = Configuration.new(opts) @queue = Queue.new diff --git a/lib/rudder/analytics/configuration.rb b/lib/rudder/analytics/configuration.rb index 8149a1f..0906657 100644 --- a/lib/rudder/analytics/configuration.rb +++ b/lib/rudder/analytics/configuration.rb @@ -7,7 +7,7 @@ class Analytics class Configuration include Rudder::Analytics::Utils - attr_reader :write_key, :data_plane_url, :on_error, :stub, :gzip, :ssl, :batch_size, :test, :max_queue_size, :backoff_policy, :retries + attr_reader :write_key, :data_plane_url, :on_error, :on_error_with_messages, :stub, :gzip, :ssl, :batch_size, :test, :max_queue_size, :backoff_policy, :retries def initialize(settings = {}) symbolized_settings = symbolize_keys(settings) @@ -18,6 +18,7 @@ def initialize(settings = {}) @max_queue_size = symbolized_settings[:max_queue_size] || Defaults::Queue::MAX_SIZE @ssl = symbolized_settings[:ssl] @on_error = symbolized_settings[:on_error] || proc { |status, error| } + @on_error_with_messages = symbolized_settings[:on_error_with_messages] || proc { |status, error, messages| } @stub = symbolized_settings[:stub] @batch_size = symbolized_settings[:batch_size] || Defaults::MessageBatch::MAX_SIZE @gzip = symbolized_settings[:gzip] diff --git a/lib/rudder/analytics/message_batch.rb b/lib/rudder/analytics/message_batch.rb index 60811c5..0d0366e 100644 --- a/lib/rudder/analytics/message_batch.rb +++ b/lib/rudder/analytics/message_batch.rb @@ -26,7 +26,7 @@ def <<(message) message_json = message.to_json # puts message_json rescue StandardError => e - raise JSONGenerationError, "Serialization error: #{e}" + raise JSONGenerationError, "Serialization error: #{e} for message: #{message.inspect}" end message_json_size = message_json.bytesize diff --git a/lib/rudder/analytics/version.rb b/lib/rudder/analytics/version.rb index a5e0056..f3f08c4 100644 --- a/lib/rudder/analytics/version.rb +++ b/lib/rudder/analytics/version.rb @@ -2,6 +2,6 @@ module Rudder class Analytics - VERSION = '3.0.1' + VERSION = '3.1.0' end end diff --git a/lib/rudder/analytics/worker.rb b/lib/rudder/analytics/worker.rb index ff18190..ae812cf 100644 --- a/lib/rudder/analytics/worker.rb +++ b/lib/rudder/analytics/worker.rb @@ -29,6 +29,7 @@ def initialize(queue, config) @write_key = config.write_key @ssl = config.ssl @on_error = config.on_error + @on_error_with_messages = config.on_error_with_messages @batch = MessageBatch.new(config.batch_size) @lock = Mutex.new @transport = Transport.new(config) @@ -46,7 +47,10 @@ def run # res = Request.new(:data_plane_url => @data_plane_url, :ssl => @ssl).post @write_key, @batch res = @transport.send @write_key, @batch - @on_error.call(res.status, res.error) unless res.status == 200 + unless res.status == 200 + @on_error.call(res.status, res.error) + @on_error_with_messages.call(res.status, res.error, @batch.messages) + end @lock.synchronize { @batch.clear } end @@ -66,6 +70,7 @@ def consume_message_from_queue! @batch << @queue.pop rescue MessageBatch::JSONGenerationError => e @on_error.call(-1, e.to_s) + @on_error_with_messages.call(-1, e.to_s, []) end end end