Skip to content

Commit

Permalink
add an option (on_error_with_messages) to provide a callback option t…
Browse files Browse the repository at this point in the history
…o get the data affected by the error
  • Loading branch information
chunhuiteng committed Nov 21, 2024
1 parent fc98c9c commit 383ab3f
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 4 deletions.
1 change: 1 addition & 0 deletions lib/rudder/analytics/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Client
# @option opts [FixNum] :max_queue_size Maximum number of calls to be
# remain queued.
# @option opts [Proc] :on_error Handles error calls from the API.
# @option opts [Proc] :on_error_with_messages Handles error calls from the API, with failed messages.
def initialize(opts = {})
@config = Configuration.new(opts)
@queue = Queue.new
Expand Down
3 changes: 2 additions & 1 deletion lib/rudder/analytics/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Analytics
class Configuration
include Rudder::Analytics::Utils

attr_reader :write_key, :data_plane_url, :on_error, :stub, :gzip, :ssl, :batch_size, :test, :max_queue_size, :backoff_policy, :retries
attr_reader :write_key, :data_plane_url, :on_error, :on_error_with_messages, :stub, :gzip, :ssl, :batch_size, :test, :max_queue_size, :backoff_policy, :retries

def initialize(settings = {})
symbolized_settings = symbolize_keys(settings)
Expand All @@ -18,6 +18,7 @@ def initialize(settings = {})
@max_queue_size = symbolized_settings[:max_queue_size] || Defaults::Queue::MAX_SIZE
@ssl = symbolized_settings[:ssl]
@on_error = symbolized_settings[:on_error] || proc { |status, error| }
@on_error_with_messages = symbolized_settings[:on_error_with_messages] || proc { |status, error, messages| }
@stub = symbolized_settings[:stub]
@batch_size = symbolized_settings[:batch_size] || Defaults::MessageBatch::MAX_SIZE
@gzip = symbolized_settings[:gzip]
Expand Down
2 changes: 1 addition & 1 deletion lib/rudder/analytics/message_batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def <<(message)
message_json = message.to_json
# puts message_json
rescue StandardError => e
raise JSONGenerationError, "Serialization error: #{e}"
raise JSONGenerationError, "Serialization error: #{e} for message: #{message.inspect}"
end

message_json_size = message_json.bytesize
Expand Down
2 changes: 1 addition & 1 deletion lib/rudder/analytics/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Rudder
class Analytics
VERSION = '3.0.1'
VERSION = '3.1.0'
end
end
7 changes: 6 additions & 1 deletion lib/rudder/analytics/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def initialize(queue, config)
@write_key = config.write_key
@ssl = config.ssl
@on_error = config.on_error
@on_error_with_messages = config.on_error_with_messages
@batch = MessageBatch.new(config.batch_size)
@lock = Mutex.new
@transport = Transport.new(config)
Expand All @@ -46,7 +47,10 @@ def run

# res = Request.new(:data_plane_url => @data_plane_url, :ssl => @ssl).post @write_key, @batch
res = @transport.send @write_key, @batch
@on_error.call(res.status, res.error) unless res.status == 200
unless res.status == 200
@on_error.call(res.status, res.error)
@on_error_with_messages.call(res.status, res.error, @batch.messages)
end

@lock.synchronize { @batch.clear }
end
Expand All @@ -66,6 +70,7 @@ def consume_message_from_queue!
@batch << @queue.pop
rescue MessageBatch::JSONGenerationError => e
@on_error.call(-1, e.to_s)
@on_error_with_messages.call(-1, e.to_s, [])
end
end
end
Expand Down

0 comments on commit 383ab3f

Please sign in to comment.