diff --git a/lib/ddtrace.rb b/lib/ddtrace.rb index d5b485725e1..b6d4179a081 100644 --- a/lib/ddtrace.rb +++ b/lib/ddtrace.rb @@ -53,6 +53,7 @@ module Datadog require 'ddtrace/contrib/http/integration' require 'ddtrace/contrib/integration' require 'ddtrace/contrib/presto/integration' +require 'ddtrace/contrib/kafka/integration' require 'ddtrace/contrib/mysql2/integration' require 'ddtrace/contrib/mongodb/integration' require 'ddtrace/contrib/pg/integration' diff --git a/lib/ddtrace/contrib/kafka/configuration/settings.rb b/lib/ddtrace/contrib/kafka/configuration/settings.rb new file mode 100644 index 00000000000..2a12513feeb --- /dev/null +++ b/lib/ddtrace/contrib/kafka/configuration/settings.rb @@ -0,0 +1,15 @@ +require 'ddtrace/contrib/configuration/settings' +require 'ddtrace/contrib/kafka/ext' + +module Datadog + module Contrib + module Kafka + module Configuration + # Custom settings for the Kafka integration + class Settings < Contrib::Configuration::Settings + option :service_name, default: Ext::SERVICE_NAME + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/ext.rb b/lib/ddtrace/contrib/kafka/ext.rb new file mode 100644 index 00000000000..eefbc15ee05 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/ext.rb @@ -0,0 +1,17 @@ +module Datadog + module Contrib + module Kafka + # Kafka integration constants + module Ext + APP = 'kafka'.freeze + SERVICE_NAME = 'kafka'.freeze + + SPAN_REQUEST = 'kafka.request'.freeze + + TAG_CLUSTER = 'cluster'.freeze + TAG_BUFFER_SIZE = 'buffer_size'.freeze + TAG_PARTITION = 'partition'.freeze + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/integration.rb b/lib/ddtrace/contrib/kafka/integration.rb new file mode 100644 index 00000000000..e5e20adb6f7 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/integration.rb @@ -0,0 +1,32 @@ +require 'ddtrace/contrib/integration' +require 'ddtrace/contrib/kafka/configuration/settings' +require 'ddtrace/contrib/kafka/patcher' + +module Datadog + module Contrib + module Kafka + # Description of Kafka integration + class Integration + include Contrib::Integration + + register_as :kafka + + def self.version + Gem.loaded_specs['ruby-kafka'] && Gem.loaded_specs['ruby-kafka'].version + end + + def self.present? + super && defined?(::Kafka) + end + + def default_configuration + Configuration::Settings.new + end + + def patcher + Patcher + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/patcher.rb b/lib/ddtrace/contrib/kafka/patcher.rb new file mode 100644 index 00000000000..0f4205156ee --- /dev/null +++ b/lib/ddtrace/contrib/kafka/patcher.rb @@ -0,0 +1,33 @@ +require 'ddtrace/contrib/patcher' +require 'ddtrace/contrib/kafka/producer' + +module Datadog + module Contrib + module Kafka + # Patcher enables patching of 'ruby-kafka' module. + module Patcher + include Contrib::Patcher + + module_function + + def patched? + done?(:kafka) + end + + def patch + do_once(:kafka) do + begin + patch_kafka_client + rescue StandardError => e + Datadog::Tracer.log.error("Unable to apply kafka integration: #{e}") + end + end + end + + def patch_kafka_client + ::Kafka::Producer.send(:include, Producer) + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/producer.rb b/lib/ddtrace/contrib/kafka/producer.rb new file mode 100644 index 00000000000..87f928de90f --- /dev/null +++ b/lib/ddtrace/contrib/kafka/producer.rb @@ -0,0 +1,75 @@ +require 'ddtrace/ext/app_types' +require 'ddtrace/ext/net' +require 'ddtrace/ext/sql' +require 'ddtrace/contrib/kafka/ext' + +module Datadog + module Contrib + module Kafka + # Kafka::Producer patch module + module Producer + module_function + + def included(base) + base.send(:prepend, InstanceMethods) + end + + # Kafka::Producer patch instance methods + module InstanceMethods + + # This will actually capture the time we spend producing data to Kafka in background + # threads by Sidekiq/API workers. + # + # returns nothing, takes no args + def deliver_messages + if buffer_size == 0 + # don't bother tracing flushes with 0 in the buffer... + super + else + datadog_pin.tracer.trace(Ext::SPAN_REQUEST) do |span| + span.resource = 'deliver_messages'.freeze + span.service = datadog_pin.service + span.span_type = Datadog::Ext::AppTypes::CUSTOM + span.set_tag(Ext::TAG_BUFFER_SIZE, buffer_size) + span.set_tag(Ext::TAG_CLUSTER, @cluster.cluster_info) + super # this will pass all args, including the block + end + end + end + + # This will capture the number/type of messages being enqueued to be + # processed by `deliver_messages` above as spans in application traces. + # + # @param value [String] the message data. + # @param key [String] the message key. + # @param headers [Hash] the headers for the message. + # @param topic [String] the topic that the message should be written to. + # @param partition [Integer] the partition that the message should be written to. + # @param partition_key [String] the key that should be used to assign a partition. + # @param create_time [Time] the timestamp that should be set on the message. + # + # @raise [BufferOverflow] if the maximum buffer size has been reached. + # @return [nil] + def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) + datadog_pin.tracer.trace(Ext::SPAN_REQUEST) do |span| + span.resource = topic.to_s + span.service = datadog_pin.service + span.span_type = Datadog::Ext::AppTypes::CUSTOM + span.set_tag(Ext::TAG_PARTITION, partition) + super # this will pass all args, including the block + end + end + + def datadog_pin + @datadog_pin ||= Datadog::Pin.new( + Datadog.configuration[:kafka][:service_name], + app: Ext::APP, + app_type: Datadog::Ext::AppTypes::CUSTOM, + tracer: Datadog.configuration[:kafka][:tracer] + ) + end + end + end + end + end +end