Skip to content

Commit

Permalink
First attempt at ruby-kafka tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
zachmccormick committed Mar 23, 2020
1 parent 9fb56bf commit 4be8fef
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/ddtrace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ module Datadog
require 'ddtrace/contrib/http/integration'
require 'ddtrace/contrib/integration'
require 'ddtrace/contrib/presto/integration'
require 'ddtrace/contrib/kafka/integration'
require 'ddtrace/contrib/mysql2/integration'
require 'ddtrace/contrib/mongodb/integration'
require 'ddtrace/contrib/pg/integration'
Expand Down
15 changes: 15 additions & 0 deletions lib/ddtrace/contrib/kafka/configuration/settings.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
require 'ddtrace/contrib/configuration/settings'
require 'ddtrace/contrib/kafka/ext'

module Datadog
module Contrib
module Kafka
module Configuration
# Custom settings for the Kafka integration
class Settings < Contrib::Configuration::Settings
option :service_name, default: Ext::SERVICE_NAME
end
end
end
end
end
17 changes: 17 additions & 0 deletions lib/ddtrace/contrib/kafka/ext.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module Datadog
module Contrib
module Kafka
# Kafka integration constants
module Ext
APP = 'kafka'.freeze
SERVICE_NAME = 'kafka'.freeze

SPAN_REQUEST = 'kafka.request'.freeze

TAG_CLUSTER = 'cluster'.freeze
TAG_BUFFER_SIZE = 'buffer_size'.freeze
TAG_PARTITION = 'partition'.freeze
end
end
end
end
32 changes: 32 additions & 0 deletions lib/ddtrace/contrib/kafka/integration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
require 'ddtrace/contrib/integration'
require 'ddtrace/contrib/kafka/configuration/settings'
require 'ddtrace/contrib/kafka/patcher'

module Datadog
module Contrib
module Kafka
# Description of Kafka integration
class Integration
include Contrib::Integration

register_as :kafka

def self.version
Gem.loaded_specs['ruby-kafka'] && Gem.loaded_specs['ruby-kafka'].version
end

def self.present?
super && defined?(::Kafka)
end

def default_configuration
Configuration::Settings.new
end

def patcher
Patcher
end
end
end
end
end
33 changes: 33 additions & 0 deletions lib/ddtrace/contrib/kafka/patcher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require 'ddtrace/contrib/patcher'
require 'ddtrace/contrib/kafka/producer'

module Datadog
module Contrib
module Kafka
# Patcher enables patching of 'ruby-kafka' module.
module Patcher
include Contrib::Patcher

module_function

def patched?
done?(:kafka)
end

def patch
do_once(:kafka) do
begin
patch_kafka_client
rescue StandardError => e
Datadog::Tracer.log.error("Unable to apply kafka integration: #{e}")
end
end
end

def patch_kafka_client
::Kafka::Producer.send(:include, Producer)
end
end
end
end
end
75 changes: 75 additions & 0 deletions lib/ddtrace/contrib/kafka/producer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
require 'ddtrace/ext/app_types'
require 'ddtrace/ext/net'
require 'ddtrace/ext/sql'
require 'ddtrace/contrib/kafka/ext'

module Datadog
module Contrib
module Kafka
# Kafka::Producer patch module
module Producer
module_function

def included(base)
base.send(:prepend, InstanceMethods)
end

# Kafka::Producer patch instance methods
module InstanceMethods

# This will actually capture the time we spend producing data to Kafka in background
# threads by Sidekiq/API workers.
#
# returns nothing, takes no args
def deliver_messages
if buffer_size == 0
# don't bother tracing flushes with 0 in the buffer...
super
else
datadog_pin.tracer.trace(Ext::SPAN_REQUEST) do |span|
span.resource = 'deliver_messages'.freeze
span.service = datadog_pin.service
span.span_type = Datadog::Ext::AppTypes::CUSTOM
span.set_tag(Ext::TAG_BUFFER_SIZE, buffer_size)
span.set_tag(Ext::TAG_CLUSTER, @cluster.cluster_info)
super # this will pass all args, including the block
end
end
end

# This will capture the number/type of messages being enqueued to be
# processed by `deliver_messages` above as spans in application traces.
#
# @param value [String] the message data.
# @param key [String] the message key.
# @param headers [Hash<String, String>] the headers for the message.
# @param topic [String] the topic that the message should be written to.
# @param partition [Integer] the partition that the message should be written to.
# @param partition_key [String] the key that should be used to assign a partition.
# @param create_time [Time] the timestamp that should be set on the message.
#
# @raise [BufferOverflow] if the maximum buffer size has been reached.
# @return [nil]
def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now)
datadog_pin.tracer.trace(Ext::SPAN_REQUEST) do |span|
span.resource = topic.to_s
span.service = datadog_pin.service
span.span_type = Datadog::Ext::AppTypes::CUSTOM
span.set_tag(Ext::TAG_PARTITION, partition)
super # this will pass all args, including the block
end
end

def datadog_pin
@datadog_pin ||= Datadog::Pin.new(
Datadog.configuration[:kafka][:service_name],
app: Ext::APP,
app_type: Datadog::Ext::AppTypes::CUSTOM,
tracer: Datadog.configuration[:kafka][:tracer]
)
end
end
end
end
end
end

0 comments on commit 4be8fef

Please sign in to comment.