Skip to content

Commit

Permalink
feat: add an error callback option to provide the affected data (#22)
Browse files Browse the repository at this point in the history
* add an option (on_error_with_messages) to provide a callback option to get the data affected by the error

* add an option (on_error_with_messages) to provide a callback option to get the data affected by the error
  • Loading branch information
chunhuiteng authored Dec 6, 2024
1 parent fc98c9c commit af48c1a
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 4 deletions.
1 change: 1 addition & 0 deletions lib/rudder/analytics/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Client
# @option opts [FixNum] :max_queue_size Maximum number of calls to be
# remain queued.
# @option opts [Proc] :on_error Handles error calls from the API.
# @option opts [Proc] :on_error_with_messages Handles error calls from the API, with failed messages.
def initialize(opts = {})
@config = Configuration.new(opts)
@queue = Queue.new
Expand Down
3 changes: 2 additions & 1 deletion lib/rudder/analytics/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Analytics
class Configuration
include Rudder::Analytics::Utils

attr_reader :write_key, :data_plane_url, :on_error, :stub, :gzip, :ssl, :batch_size, :test, :max_queue_size, :backoff_policy, :retries
attr_reader :write_key, :data_plane_url, :on_error, :on_error_with_messages, :stub, :gzip, :ssl, :batch_size, :test, :max_queue_size, :backoff_policy, :retries

def initialize(settings = {})
symbolized_settings = symbolize_keys(settings)
Expand All @@ -18,6 +18,7 @@ def initialize(settings = {})
@max_queue_size = symbolized_settings[:max_queue_size] || Defaults::Queue::MAX_SIZE
@ssl = symbolized_settings[:ssl]
@on_error = symbolized_settings[:on_error] || proc { |status, error| }
@on_error_with_messages = symbolized_settings[:on_error_with_messages] || proc { |status, error, messages| }
@stub = symbolized_settings[:stub]
@batch_size = symbolized_settings[:batch_size] || Defaults::MessageBatch::MAX_SIZE
@gzip = symbolized_settings[:gzip]
Expand Down
2 changes: 1 addition & 1 deletion lib/rudder/analytics/message_batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def <<(message)
message_json = message.to_json
# puts message_json
rescue StandardError => e
raise JSONGenerationError, "Serialization error: #{e}"
raise JSONGenerationError, "Serialization error: #{e} for message: #{message.inspect}"
end

message_json_size = message_json.bytesize
Expand Down
2 changes: 1 addition & 1 deletion lib/rudder/analytics/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Rudder
class Analytics
VERSION = '3.0.1'
VERSION = '3.1.0'
end
end
7 changes: 6 additions & 1 deletion lib/rudder/analytics/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def initialize(queue, config)
@write_key = config.write_key
@ssl = config.ssl
@on_error = config.on_error
@on_error_with_messages = config.on_error_with_messages
@batch = MessageBatch.new(config.batch_size)
@lock = Mutex.new
@transport = Transport.new(config)
Expand All @@ -46,7 +47,10 @@ def run

# res = Request.new(:data_plane_url => @data_plane_url, :ssl => @ssl).post @write_key, @batch
res = @transport.send @write_key, @batch
@on_error.call(res.status, res.error) unless res.status == 200
unless res.status == 200
@on_error.call(res.status, res.error)
@on_error_with_messages.call(res.status, res.error, @batch.messages)
end

@lock.synchronize { @batch.clear }
end
Expand All @@ -66,6 +70,7 @@ def consume_message_from_queue!
@batch << @queue.pop
rescue MessageBatch::JSONGenerationError => e
@on_error.call(-1, e.to_s)
@on_error_with_messages.call(-1, e.to_s, [])
end
end
end
Expand Down
32 changes: 32 additions & 0 deletions spec/rudder/analytics/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,38 @@ class Analytics
expect(error).to eq('Some error')
end

it 'executes the on_error_with_messages error handler if the request is invalid' do
Rudder::Analytics::Transport
.any_instance
.stub(:send)
.and_return(Rudder::Analytics::Response.new(400, 'Some error'))

status = error = data = nil
on_error_with_messages = proc do |yielded_status, yielded_error, yielded_data|
sleep 0.2 # Make this take longer than thread spin-up (below)
status, error, data = yielded_status, yielded_error, yielded_data.dup
end

message = { context: {:ip=>"127.0.0.1"}, type: "identify", traits: { email: "[email protected]" } }
queue = Queue.new
queue << message
config = Configuration.new({ :on_error_with_messages => on_error_with_messages, :write_key => 'write_key', :data_plane_url => 'data_plane_url' })
worker = described_class.new(queue, config)

# This is to ensure that Client#flush doesn't finish before calling
# the error handler.
Thread.new { worker.run }
sleep 0.1 # First give thread time to spin-up.
sleep 0.01 while worker.is_requesting?

Rudder::Analytics::Transport.any_instance.unstub(:send)

expect(queue).to be_empty
expect(status).to eq(400)
expect(error).to eq('Some error')
expect(data).to eq([message])
end

# it 'does not call on_error if the request is good' do
# on_error = proc do |status, error|
# puts "#{status}, #{error}"
Expand Down

0 comments on commit af48c1a

Please sign in to comment.