Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Matrixfile
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
'karafka-latest' => '❌ 2.5 / ❌ 2.6 / ❌ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby',
# karafka 2.0.41 contains broken dependency of karafka-core 2.2.4, which depends on Ruby (>= 2.6.0)
'karafka-min' => '❌ 2.5 / ❌ 2.6 / ❌ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby',
'waterdrop' => '❌ 2.5 / ❌ 2.6 / ❌ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby'
},
'lograge' => {
'activesupport' => '✅ 2.5 / ✅ 2.6 / ✅ 2.7 / ✅ 3.0 / ✅ 3.1 / ✅ 3.2 / ✅ 3.3 / ✅ 3.4 / ✅ jruby'
Expand Down
1 change: 1 addition & 0 deletions appraisal/jruby-9.4.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
build_coverage_matrix('mongo', min: '2.1.0')
build_coverage_matrix('dalli', [2])
build_coverage_matrix('karafka', min: '2.3.0')
build_coverage_matrix('waterdrop', min: '2.8.8.rc1')

appraise 'karafka-min' do
gem 'karafka', '= 2.3.0'
Expand Down
6 changes: 2 additions & 4 deletions appraisal/ruby-3.0.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,10 @@
build_coverage_matrix('rest-client')
build_coverage_matrix('mongo', min: '2.1.0')
build_coverage_matrix('dalli', [2])
build_coverage_matrix('karafka', min: '2.3.0')
build_coverage_matrix('waterdrop', min: '2.8.8.rc1')
build_coverage_matrix('devise', min: '3.2.1')

appraise 'karafka-min' do
gem 'karafka', '= 2.3.0'
end

appraise 'relational_db' do
gem 'activerecord', '~> 7'
gem 'delayed_job'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.1.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
build_coverage_matrix('mongo', min: '2.1.0')
build_coverage_matrix('dalli', [2])
build_coverage_matrix('karafka', min: '2.3.0')
build_coverage_matrix('waterdrop', min: '2.8.8.rc1')
build_coverage_matrix('devise', min: '3.2.1')

appraise 'relational_db' do
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
build_coverage_matrix('mongo', min: '2.1.0')
build_coverage_matrix('dalli', [2])
build_coverage_matrix('karafka', min: '2.3.0')
build_coverage_matrix('waterdrop', min: '2.8.8.rc1')
build_coverage_matrix('devise', min: '3.2.1')

appraise 'relational_db' do
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
build_coverage_matrix('mongo', min: '2.1.0')
build_coverage_matrix('dalli', [2])
build_coverage_matrix('karafka', min: '2.3.0')
build_coverage_matrix('waterdrop', min: '2.8.8.rc1')
build_coverage_matrix('devise', min: '3.2.1')

appraise 'relational_db' do
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.4.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
build_coverage_matrix('mongo', min: '2.1.0')
build_coverage_matrix('dalli', [2])
build_coverage_matrix('karafka', min: '2.3.0')
build_coverage_matrix('waterdrop', min: '2.8.8.rc1')
build_coverage_matrix('devise', min: '3.2.1')

appraise 'relational_db' do
Expand Down
1 change: 1 addition & 0 deletions docs/Compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ For a list of available integrations, and their configuration options, refer to
| Sneakers | `sneakers` | `>= 2.12.0` | `>= 2.12.0` | [Link][46] | [Link](https://github.com/jondot/sneakers) |
| Stripe | `stripe` | `>= 5.15.0` | `>= 5.15.0` | [Link][47] | [Link](https://github.com/stripe/stripe-ruby) |
| Sucker Punch | `sucker_punch` | `>= 2.0` | `>= 2.0` | [Link][48] | [Link](https://github.com/brandonhilkert/sucker_punch) |
| WaterDrop | `waterdrop` | `>= 2.8.8.rc1` | `>= 2.8.8.rc1` | [Link][55] | [Link](https://github.com/karafka/waterdrop) |

### Support Policy

Expand Down
30 changes: 26 additions & 4 deletions docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -1158,10 +1158,32 @@ end
```
`options` are the following keyword arguments:

| Key | Env Var | Type | Description | Default |
| --------------------- | ------------------------ | ------ | --------------------------------------------------- | ------- |
| `enabled` | `DD_TRACE_KARAFKA_ENABLED` | `Bool` | Specifies whether the integration should create spans. | `true` |
| `distributed_tracing` | | `Bool` | Enables [distributed tracing](#distributed-tracing). | `false` |
| Key | Env Var | Type | Description | Default |
| --------------------- | -------------------------- | ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `enabled` | `DD_TRACE_KARAFKA_ENABLED` | `Bool` | Specifies whether the integration should create spans. | `true` |
| `distributed_tracing` | | `Bool` | Enables [distributed tracing](#distributed-tracing) (when iterating through the kafka messages, each of the messages' traces will be resumed for the duration of the block). | `false` |

### WaterDrop

The WaterDrop integration provides tracing of the `waterdrop` gem (a dependency of `karafka`, but also can be used standalone).

This integration activates automatically with the Karafka framework. If your application doesn’t use Karafka (for example, if it only produces messages consumed by another app), enable it manually with `Datadog.configure`:

```ruby
require 'waterdrop'
require 'datadog'

Datadog.configure do |c|
c.tracing.instrument :waterdrop, **options
end

```
`options` are the following keyword arguments:

| Key | Env Var | Type | Description | Default |
| --------------------- | ---------------------------- | ------ | -------------------------------------------------------------------------------------------------------------------- | ------- |
| `enabled` | `DD_TRACE_WATERDROP_ENABLED` | `Bool` | Specifies whether the integration should create spans. | `true` |
| `distributed_tracing` | | `Bool` | Enables [distributed tracing](#distributed-tracing) (the trace context will be injected onto the produced messages). | `false` |

### MongoDB

Expand Down
1 change: 1 addition & 0 deletions lib/datadog/tracing/contrib.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ module Contrib
require_relative 'contrib/httprb/integration'
require_relative 'contrib/integration'
require_relative 'contrib/kafka/integration'
require_relative 'contrib/waterdrop'
require_relative 'contrib/karafka'
require_relative 'contrib/lograge/integration'
require_relative 'contrib/mongodb/integration'
Expand Down
30 changes: 30 additions & 0 deletions lib/datadog/tracing/contrib/karafka/framework.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

module Datadog
module Tracing
module Contrib
module Karafka
# Karafka framework code, used to essentially:
# - handle configuration entries which are specific to Datadog tracing
# - instrument parts of the framework when needed
module Framework
def self.setup
Datadog.configure do |datadog_config|
karafka_config = datadog_config.tracing[:karafka]
activate_waterdrop!(datadog_config, karafka_config)
end
end

# Apply relevant configuration from Karafka to WaterDrop
def self.activate_waterdrop!(datadog_config, karafka_config)
datadog_config.tracing.instrument(
:waterdrop,
service_name: karafka_config[:service_name],
distributed_tracing: karafka_config[:distributed_tracing],
)
end
end
end
end
end
end
14 changes: 14 additions & 0 deletions lib/datadog/tracing/contrib/karafka/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ def each(&block)
end
end

module AppPatch
ONLY_ONCE_PER_APP = Hash.new { |h, key| h[key] = Core::Utils::OnlyOnce.new }

def initialized!
ONLY_ONCE_PER_APP[self].run do
# Activate tracing on components related to Karafka (e.g. WaterDrop)
Contrib::Karafka::Framework.setup
end
super
end
end

# Patcher enables patching of 'karafka' module.
module Patcher
include Contrib::Patcher
Expand All @@ -60,9 +72,11 @@ def target_version

def patch
require_relative 'monitor'
require_relative 'framework'

::Karafka::Instrumentation::Monitor.prepend(Monitor)
::Karafka::Messages::Messages.prepend(MessagesPatch)
::Karafka::App.singleton_class.prepend(AppPatch)
end
end
end
Expand Down
37 changes: 37 additions & 0 deletions lib/datadog/tracing/contrib/waterdrop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

require_relative 'component'
require_relative 'waterdrop/integration'
require_relative 'waterdrop/distributed/propagation'

module Datadog
module Tracing
module Contrib
# `WaterDrop` integration public API
module WaterDrop
def self.inject(digest, data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.inject!(digest, data)
end

def self.extract(data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.extract(data)
end

Contrib::Component.register('waterdrop') do |config|
tracing = config.tracing
tracing.propagation_style

@propagation = WaterDrop::Distributed::Propagation.new(
propagation_style_inject: tracing.propagation_style_inject,
propagation_style_extract: tracing.propagation_style_extract,
propagation_extract_first: tracing.propagation_extract_first
)
end
end
end
end
end
27 changes: 27 additions & 0 deletions lib/datadog/tracing/contrib/waterdrop/configuration/settings.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

require_relative '../../configuration/settings'
require_relative '../ext'

module Datadog
module Tracing
module Contrib
module WaterDrop
module Configuration
# @public_api
class Settings < Contrib::Configuration::Settings
option :enabled do |o|
o.type :bool
o.env Ext::ENV_ENABLED
o.default true
end

option :service_name

option :distributed_tracing, default: false, type: :bool
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# frozen_string_literal: true

require_relative '../../../distributed/fetcher'
require_relative '../../../distributed/propagation'
require_relative '../../../distributed/b3_multi'
require_relative '../../../distributed/b3_single'
require_relative '../../../distributed/datadog'
require_relative '../../../distributed/none'
require_relative '../../../distributed/trace_context'
require_relative '../../../configuration/ext'

module Datadog
module Tracing
module Contrib
module WaterDrop
module Distributed
# Extracts and injects propagation through Kafka message headers.
class Propagation < Tracing::Distributed::Propagation
def initialize(
propagation_style_inject:,
propagation_style_extract:,
propagation_extract_first:
)
super(
propagation_styles: {
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER =>
Tracing::Distributed::B3Multi.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER =>
Tracing::Distributed::B3Single.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG =>
Tracing::Distributed::Datadog.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT =>
Tracing::Distributed::TraceContext.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_BAGGAGE =>
Tracing::Distributed::Baggage.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => Tracing::Distributed::None.new
},
propagation_style_inject: propagation_style_inject,
propagation_style_extract: propagation_style_extract,
propagation_extract_first: propagation_extract_first
)
end
end
end
end
end
end
end
17 changes: 17 additions & 0 deletions lib/datadog/tracing/contrib/waterdrop/ext.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module Datadog
module Tracing
module Contrib
module WaterDrop
module Ext
ENV_ENABLED = 'DD_TRACE_WATERDROP_ENABLED'

SPAN_PRODUCER = 'karafka.produce'

TAG_PRODUCER = 'kafka.producer'
end
end
end
end
end
43 changes: 43 additions & 0 deletions lib/datadog/tracing/contrib/waterdrop/integration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

require_relative '../integration'
require_relative 'configuration/settings'
require_relative 'patcher'

module Datadog
module Tracing
module Contrib
module WaterDrop
# Description of WaterDrop integration
class Integration
include Contrib::Integration

# WaterDrop added class-level instrumentation in version 2.8.8.rc1
MINIMUM_VERSION = Gem::Version.new('2.8.8.rc1')

register_as :waterdrop, auto_patch: false

def self.version
Gem.loaded_specs['waterdrop']&.version
end

def self.loaded?
!defined?(::WaterDrop).nil?
end

def self.compatible?
super && version >= MINIMUM_VERSION
end

def new_configuration
Configuration::Settings.new
end

def patcher
Patcher
end
end
end
end
end
end
35 changes: 35 additions & 0 deletions lib/datadog/tracing/contrib/waterdrop/middleware.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

require_relative 'ext'

module Datadog
module Tracing
module Contrib
module WaterDrop
# Middleware to propagate tracing context in messages produced by WaterDrop
module Middleware
class << self
def call(message)
trace_op = Datadog::Tracing.active_trace

if trace_op && Datadog::Tracing::Distributed::PropagationPolicy.enabled?(
global_config: configuration,
trace: trace_op
)
WaterDrop.inject(trace_op.to_digest, message[:headers] ||= {})
end

message
end

private

def configuration
Datadog.configuration.tracing[:waterdrop]
end
end
end
end
end
end
end
Loading