From c2138e8d8f4d8f1aae7131390186a88dbb882317 Mon Sep 17 00:00:00 2001 From: Rafael Gibim <9031589+Drowze@users.noreply.github.com> Date: Wed, 3 Sep 2025 17:13:49 -0300 Subject: [PATCH 1/2] Support different karafka configurations per topic --- .../tracing/contrib/karafka/integration.rb | 4 ++ .../tracing/contrib/karafka/patcher.rb | 5 +- .../tracing/contrib/karafka/patcher_spec.rb | 64 +++++++++++++------ 3 files changed, 53 insertions(+), 20 deletions(-) diff --git a/lib/datadog/tracing/contrib/karafka/integration.rb b/lib/datadog/tracing/contrib/karafka/integration.rb index 8c0708b029a..45417c0c7db 100644 --- a/lib/datadog/tracing/contrib/karafka/integration.rb +++ b/lib/datadog/tracing/contrib/karafka/integration.rb @@ -38,6 +38,10 @@ def new_configuration def patcher Patcher end + + def resolver + @resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new + end end end end diff --git a/lib/datadog/tracing/contrib/karafka/patcher.rb b/lib/datadog/tracing/contrib/karafka/patcher.rb index 38fde6115e0..146bb71597a 100644 --- a/lib/datadog/tracing/contrib/karafka/patcher.rb +++ b/lib/datadog/tracing/contrib/karafka/patcher.rb @@ -10,8 +10,8 @@ module Contrib module Karafka # Patch to add tracing to Karafka::Messages::Messages module MessagesPatch - def configuration - Datadog.configuration.tracing[:karafka] + def datadog_configuration(topic) + Datadog.configuration.tracing[:karafka, topic] end def propagation @@ -28,6 +28,7 @@ def each(&block) parent_trace_digest = Datadog::Tracing.active_trace&.to_digest @messages_array.each do |message| + configuration = datadog_configuration(message.topic) trace_digest = if configuration[:distributed_tracing] headers = if message.metadata.respond_to?(:raw_headers) message.metadata.raw_headers diff --git a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb index 0eee615c222..9be7e726f1e 100644 --- a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb +++ b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb @@ -17,6 +17,7 @@ before do Datadog.configure do |c| c.tracing.instrument :karafka, configuration_options + c.tracing.instrument :karafka, describes: /special_/, distributed_tracing: false end end @@ -31,16 +32,12 @@ let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_MESSAGE_CONSUME } it 'is expected to send a span' do - metadata = ::Karafka::Messages::Metadata.new - metadata['offset'] = 412 + metadata = ::Karafka::Messages::Metadata.new(offset: 412, timestamp: Time.now, topic: 'topic_a') raw_payload = rand.to_s message = ::Karafka::Messages::Message.new(raw_payload, metadata) - allow(message).to receive(:timestamp).and_return(Time.now) - allow(message).to receive(:topic).and_return('topic_a') - - topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0)) + topic = ::Karafka::Routing::Topic.new(message.topic, double(id: 0)) messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) expect(messages).to all(be_a(::Karafka::Messages::Message)) @@ -55,6 +52,7 @@ end context 'when the message has tracing headers' do + let(:topic_name) { "topic_a" } let(:message) do headers = {} producer_trace = nil @@ -64,15 +62,15 @@ producer_trace = trace Datadog::Tracing::Contrib::Karafka.inject(trace.to_digest, headers) end - metadata = ::Karafka::Messages::Metadata.new - metadata['offset'] = 412 - metadata[headers_accessor] = headers + metadata = ::Karafka::Messages::Metadata.new( + offset: 412, + headers_accessor => headers, + topic: topic_name, + timestamp: Time.now + ) raw_payload = rand.to_s - message = ::Karafka::Messages::Message.new(raw_payload, metadata) - allow(message).to receive(:timestamp).and_return(Time.now) - allow(message).to receive(:topic).and_return('topic_a') - message + ::Karafka::Messages::Message.new(raw_payload, metadata) end let(:headers_accessor) do ::Karafka::Messages::Metadata.members.include?(:raw_headers) ? 'raw_headers' : 'headers' @@ -89,7 +87,7 @@ consumer_span = Datadog::Tracing.active_span consumer_trace = Datadog::Tracing.active_trace - topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0)) + topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0)) messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) expect(messages).to all(be_a(::Karafka::Messages::Message)) @@ -113,6 +111,37 @@ end end + context "when distributed tracing is disabled for the topic in particular" do + let(:topic_name) { "special_topic" } + + it 'does not continue the span that produced the message' do + consumer_span = nil + consumer_trace = nil + + Datadog::Tracing.trace('consumer') do + consumer_span = Datadog::Tracing.active_span + consumer_trace = Datadog::Tracing.active_trace + + topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0)) + messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) + expect(messages).to all(be_a(::Karafka::Messages::Message)) + + # assert that the current trace re-set to the original trace after iterating the messages + expect(Datadog::Tracing.active_trace).to eq(consumer_trace) + expect(Datadog::Tracing.active_span).to eq(consumer_span) + end + + expect(spans).to have(3).items + + # assert that the message span is not continuation of the producer span + expect(span.parent_id).to eq(consumer_span.id) + expect(span.trace_id).to eq(consumer_trace.id) + + expect(span.links).to be_empty + expect(consumer_span.links).to be_empty + end + end + context 'when distributed tracing is not enabled' do let(:configuration_options) { { distributed_tracing: false } } @@ -124,7 +153,7 @@ consumer_span = Datadog::Tracing.active_span consumer_trace = Datadog::Tracing.active_trace - topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0)) + topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0)) messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now) expect(messages).to all(be_a(::Karafka::Messages::Message)) @@ -150,12 +179,11 @@ let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_WORKER_PROCESS } it 'is expected to send a span' do - metadata = ::Karafka::Messages::Metadata.new - metadata['offset'] = 412 + metadata = ::Karafka::Messages::Metadata.new(offset: 412, topic: 'topic_a') raw_payload = rand.to_s message = ::Karafka::Messages::Message.new(raw_payload, metadata) - job = double(executor: double(topic: double(name: 'topic_a', consumer: 'ABC'), partition: 0), messages: [message]) + job = double(executor: double(topic: double(name: message.topic, consumer: 'ABC'), partition: 0), messages: [message]) Karafka.monitor.instrument('worker.processed', { job: job }) do # Noop From b920eee950406e72bff2250f4f1c809dbc6f43e2 Mon Sep 17 00:00:00 2001 From: Rafael Gibim <9031589+Drowze@users.noreply.github.com> Date: Wed, 3 Sep 2025 18:33:47 -0300 Subject: [PATCH 2/2] Support different waterdrop configurations per topic --- .../tracing/contrib/karafka/framework.rb | 14 ++++++-- .../tracing/contrib/waterdrop/integration.rb | 4 +++ .../tracing/contrib/waterdrop/monitor.rb | 16 +++++---- .../tracing/contrib/karafka/patcher_spec.rb | 35 +++++++++++++++++++ .../tracing/contrib/waterdrop/monitor_spec.rb | 2 -- 5 files changed, 60 insertions(+), 11 deletions(-) diff --git a/lib/datadog/tracing/contrib/karafka/framework.rb b/lib/datadog/tracing/contrib/karafka/framework.rb index 979af04310f..38f0748f474 100644 --- a/lib/datadog/tracing/contrib/karafka/framework.rb +++ b/lib/datadog/tracing/contrib/karafka/framework.rb @@ -9,18 +9,26 @@ module Karafka # - instrument parts of the framework when needed module Framework def self.setup + karafka_configurations = Datadog.configuration.tracing.fetch_integration(:karafka).configurations + waterdrop_configurations = Datadog.configuration.tracing.fetch_integration(:waterdrop).configurations + Datadog.configure do |datadog_config| - karafka_config = datadog_config.tracing[:karafka] - activate_waterdrop!(datadog_config, karafka_config) + karafka_configurations.each do |name, karafka_config| + # do not override user configuration + next if name != :default && waterdrop_configurations.key?(name) + activate_waterdrop!(datadog_config, karafka_config, name) + end end end # Apply relevant configuration from Karafka to WaterDrop - def self.activate_waterdrop!(datadog_config, karafka_config) + def self.activate_waterdrop!(datadog_config, karafka_config, name) datadog_config.tracing.instrument( :waterdrop, + enabled: karafka_config[:enabled], service_name: karafka_config[:service_name], distributed_tracing: karafka_config[:distributed_tracing], + describes: name ) end end diff --git a/lib/datadog/tracing/contrib/waterdrop/integration.rb b/lib/datadog/tracing/contrib/waterdrop/integration.rb index 7efe7e1d507..6e776157fe5 100644 --- a/lib/datadog/tracing/contrib/waterdrop/integration.rb +++ b/lib/datadog/tracing/contrib/waterdrop/integration.rb @@ -37,6 +37,10 @@ def new_configuration def patcher Patcher end + + def resolver + @resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new + end end end end diff --git a/lib/datadog/tracing/contrib/waterdrop/monitor.rb b/lib/datadog/tracing/contrib/waterdrop/monitor.rb index 480c5f8bb46..63dba40ccb0 100644 --- a/lib/datadog/tracing/contrib/waterdrop/monitor.rb +++ b/lib/datadog/tracing/contrib/waterdrop/monitor.rb @@ -18,10 +18,6 @@ module Monitor message.produced_sync ].freeze - def configuration - Datadog.configuration.tracing[:waterdrop] - end - def instrument(event_id, payload = {}, &block) return super unless TRACEABLE_EVENTS.include?(event_id) @@ -40,7 +36,7 @@ def instrument(event_id, payload = {}, &block) span.set_tag(Contrib::Karafka::Ext::TAG_MESSAGE_COUNT, payload[:messages].size) - payload[:messages].each { |message| inject(trace_digest, message) } if configuration[:distributed_tracing] + payload[:messages].each { |message| inject(trace_digest, message) } else action = event_id.sub('message.produced', 'produce') @@ -48,7 +44,7 @@ def instrument(event_id, payload = {}, &block) span.set_tag(Contrib::Karafka::Ext::TAG_PARTITION, payload[:message][:partition]) span.set_tag(Contrib::Karafka::Ext::TAG_MESSAGE_COUNT, 1) - inject(trace_digest, payload[:message]) if configuration[:distributed_tracing] + inject(trace_digest, payload[:message]) end span.resource = "waterdrop.#{action}" @@ -63,9 +59,17 @@ def instrument(event_id, payload = {}, &block) private def inject(trace_digest, message) + return unless datadog_configuration(message[:topic])[:distributed_tracing] + message[:headers] ||= {} WaterDrop.inject(trace_digest, message[:headers]) end + + # cache the configuration resolution per topic to avoid repeated lookups in message batches + def datadog_configuration(topic) + @datadog_configuration ||= {} + @datadog_configuration[topic] ||= Datadog.configuration.tracing[:waterdrop, topic] + end end end end diff --git a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb index 9be7e726f1e..f4bf5fb64bc 100644 --- a/spec/datadog/tracing/contrib/karafka/patcher_spec.rb +++ b/spec/datadog/tracing/contrib/karafka/patcher_spec.rb @@ -199,4 +199,39 @@ expect(span.resource).to eq 'ABC#consume' end end + + describe "framework auto-instrumentation" do + around do |example| + # Reset before and after each example; don't allow global state to linger. + Datadog.registry[:waterdrop].reset_configuration! + example.run + Datadog.registry[:waterdrop].reset_configuration! + + # reset Karafka internal state as well + Karafka::App.config.internal.status.reset! + Karafka.refresh! + end + + before do + Datadog.configure do |c| + c.tracing.instrument :karafka, describes: "conflicting_topic", distributed_tracing: true + c.tracing.instrument :waterdrop, describes: "conflicting_topic", distributed_tracing: false + end + end + + it "automatically enables waterdrop instrumentation" do + Karafka::App.setup do |c| + c.kafka = { 'bootstrap.servers': '127.0.0.1:9092' } + end + + expect(Datadog.configuration.tracing[:waterdrop][:enabled]).to be true + expect(Datadog.configuration.tracing[:waterdrop][:distributed_tracing]).to be true + + expect(Datadog.configuration.tracing[:waterdrop, "special_topic"][:enabled]).to be true + expect(Datadog.configuration.tracing[:waterdrop, "special_topic"][:distributed_tracing]).to be false + + expect(Datadog.configuration.tracing[:waterdrop, "conflicting_topic"][:enabled]).to be true + expect(Datadog.configuration.tracing[:waterdrop, "conflicting_topic"][:distributed_tracing]).to be false + end + end end diff --git a/spec/datadog/tracing/contrib/waterdrop/monitor_spec.rb b/spec/datadog/tracing/contrib/waterdrop/monitor_spec.rb index 05f53fa565c..d736a49ac81 100644 --- a/spec/datadog/tracing/contrib/waterdrop/monitor_spec.rb +++ b/spec/datadog/tracing/contrib/waterdrop/monitor_spec.rb @@ -8,8 +8,6 @@ end require 'datadog' -puts "waterdrop version: #{WaterDrop::VERSION}" - RSpec.describe 'Waterdrop monitor' do before do Datadog.configure do |c|