forked from DataDog/dd-trace-rb
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
1b99c5a
commit 71ffb64
Showing
6 changed files
with
173 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
require 'ddtrace/contrib/configuration/settings' | ||
require 'ddtrace/contrib/kafka/ext' | ||
|
||
module Datadog | ||
module Contrib | ||
module Kafka | ||
module Configuration | ||
# Custom settings for the Kafka integration | ||
class Settings < Contrib::Configuration::Settings | ||
option :service_name, default: Ext::SERVICE_NAME | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
module Datadog | ||
module Contrib | ||
module Kafka | ||
# Kafka integration constants | ||
module Ext | ||
APP = 'kafka'.freeze | ||
SERVICE_NAME = 'kafka'.freeze | ||
|
||
SPAN_REQUEST = 'kafka.request'.freeze | ||
|
||
TAG_CLUSTER = 'cluster'.freeze | ||
TAG_BUFFER_SIZE = 'buffer_size'.freeze | ||
TAG_PARTITION = 'partition'.freeze | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
require 'ddtrace/contrib/integration' | ||
require 'ddtrace/contrib/kafka/configuration/settings' | ||
require 'ddtrace/contrib/kafka/patcher' | ||
|
||
module Datadog | ||
module Contrib | ||
module Kafka | ||
# Description of Kafka integration | ||
class Integration | ||
include Contrib::Integration | ||
|
||
register_as :kafka | ||
|
||
def self.version | ||
Gem.loaded_specs['ruby-kafka'] && Gem.loaded_specs['ruby-kafka'].version | ||
end | ||
|
||
def self.present? | ||
super && defined?(::Kafka) | ||
end | ||
|
||
def default_configuration | ||
Configuration::Settings.new | ||
end | ||
|
||
def patcher | ||
Patcher | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
require 'ddtrace/contrib/patcher' | ||
require 'ddtrace/contrib/kafka/producer' | ||
|
||
module Datadog | ||
module Contrib | ||
module Kafka | ||
# Patcher enables patching of 'ruby-kafka' module. | ||
module Patcher | ||
include Contrib::Patcher | ||
|
||
module_function | ||
|
||
def patched? | ||
done?(:kafka) | ||
end | ||
|
||
def patch | ||
do_once(:kafka) do | ||
begin | ||
patch_kafka_client | ||
rescue StandardError => e | ||
Datadog::Tracer.log.error("Unable to apply kafka integration: #{e}") | ||
end | ||
end | ||
end | ||
|
||
def patch_kafka_client | ||
::Kafka::Producer.send(:include, Producer) | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
require 'ddtrace/ext/app_types' | ||
require 'ddtrace/ext/net' | ||
require 'ddtrace/ext/sql' | ||
require 'ddtrace/contrib/kafka/ext' | ||
|
||
module Datadog | ||
module Contrib | ||
module Kafka | ||
# Kafka::Producer patch module | ||
module Producer | ||
module_function | ||
|
||
def included(base) | ||
base.send(:prepend, InstanceMethods) | ||
end | ||
|
||
# Kafka::Producer patch instance methods | ||
module InstanceMethods | ||
|
||
# This will actually capture the time we spend producing data to Kafka in background | ||
# threads by Sidekiq/API workers. | ||
# | ||
# returns nothing, takes no args | ||
def deliver_messages | ||
if buffer_size == 0 | ||
# don't bother tracing flushes with 0 in the buffer... | ||
super | ||
else | ||
datadog_pin.tracer.trace(Ext::SPAN_REQUEST) do |span| | ||
span.resource = 'deliver_messages'.freeze | ||
span.service = datadog_pin.service | ||
span.span_type = Datadog::Ext::AppTypes::CUSTOM | ||
span.set_tag(Ext::TAG_BUFFER_SIZE, buffer_size) | ||
span.set_tag(Ext::TAG_CLUSTER, @cluster.cluster_info) | ||
super # this will pass all args, including the block | ||
end | ||
end | ||
end | ||
|
||
# This will capture the number/type of messages being enqueued to be | ||
# processed by `deliver_messages` above as spans in application traces. | ||
# | ||
# @param value [String] the message data. | ||
# @param key [String] the message key. | ||
# @param headers [Hash<String, String>] the headers for the message. | ||
# @param topic [String] the topic that the message should be written to. | ||
# @param partition [Integer] the partition that the message should be written to. | ||
# @param partition_key [String] the key that should be used to assign a partition. | ||
# @param create_time [Time] the timestamp that should be set on the message. | ||
# | ||
# @raise [BufferOverflow] if the maximum buffer size has been reached. | ||
# @return [nil] | ||
def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) | ||
datadog_pin.tracer.trace(Ext::SPAN_REQUEST) do |span| | ||
span.resource = topic.to_s | ||
span.service = datadog_pin.service | ||
span.span_type = Datadog::Ext::AppTypes::CUSTOM | ||
span.set_tag(Ext::TAG_PARTITION, partition) | ||
super # this will pass all args, including the block | ||
end | ||
end | ||
|
||
def datadog_pin | ||
@datadog_pin ||= Datadog::Pin.new( | ||
Datadog.configuration[:kafka][:service_name], | ||
app: Ext::APP, | ||
app_type: Datadog::Ext::AppTypes::CUSTOM, | ||
tracer: Datadog.configuration[:kafka][:tracer] | ||
) | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end |