Skip to content

Commit

Permalink
Merge branch 'main' into codeboten/rm-logging-references
Browse files Browse the repository at this point in the history
  • Loading branch information
kaylareopelle authored Oct 1, 2024
2 parents 157f565 + 7b349b1 commit 92596c3
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 19 deletions.
6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ For example, to test `opentelemetry-instrumentation-action_pack` you would:
2. Install the bundle with `bundle install`
3. Run the tests with `bundle exec rake`

Note: Some test suites make use of [Appraisal](https://github.com/thoughtbot/appraisal), a library for testing against different versions of dependencies. To run tests in suites that use Appraisal:

1. Change directory to the instrumentation you'd like to test, ex: `instrumentation/action_pack`
2. Install the bundle with `bundle exec appraisal install`
3. Run the tests with `bundle exec appraisal rake test`

### Docker setup

We use Docker Compose to configure and build services used in development and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class << base
# Module to prepend to Que singleton class
module ClassMethods
def enqueue(*args, job_options: {}, **arg_opts)
# In Que version 2.1.0 `bulk_enqueue` was introduced.
# In that case, the span is created inside the `bulk_enqueue` method.
return super(*args, **arg_opts) if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert]

tracer = Que::Instrumentation.instance.tracer
otel_config = Que::Instrumentation.instance.config

Expand All @@ -43,19 +47,8 @@ def enqueue(*args, job_options: {}, **arg_opts)
OpenTelemetry.propagation.inject(tags, setter: TagSetter)
end

# In Que version 2.1.0 `bulk_enqueue` was introduced and in order
# for it to work, we must pass `job_options` to `bulk_enqueue` instead of enqueue.
if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert]
Thread.current[:que_jobs_to_bulk_insert][:job_options] = Thread.current[:que_jobs_to_bulk_insert][:job_options]&.merge(tags: tags) do |_, a, b|
a.is_a?(Array) && b.is_a?(Array) ? a.concat(b) : b
end

job = super(*args, **arg_opts)
job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs].last
else
job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts)
job_attrs = job.que_attrs
end
job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts)
job_attrs = job.que_attrs

span.name = "#{job_attrs[:job_class]} publish"
span.add_attributes(QueJob.job_attributes(job_attrs))
Expand All @@ -67,6 +60,32 @@ def enqueue(*args, job_options: {}, **arg_opts)
def gem_version
@gem_version ||= Gem.loaded_specs['que'].version
end

if Gem.loaded_specs['que'].version >= Gem::Version.new('2.1.0')
def bulk_enqueue(**_kwargs, &block)
tracer = Que::Instrumentation.instance.tracer
otel_config = Que::Instrumentation.instance.config

tracer.in_span('publish', kind: :producer) do |span|
super do
yield

job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]

unless job_attrs.empty?
span.name = "#{job_attrs.first[:job_class]} publish"
span.add_attributes(QueJob.job_attributes(job_attrs.first))
end

if otel_config[:propagation_style] != :none
job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
job_options[:tags] ||= []
OpenTelemetry.propagation.inject(job_options[:tags], setter: TagSetter)
end
end
end
end
end
end

def self.job_attributes(job_attrs)
Expand Down
78 changes: 73 additions & 5 deletions instrumentation/que/test/opentelemetry/instrumentation/que_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,74 @@ def self.run(first, second); end
end
end

describe 'enqueueing multiple jobs' do
it 'creates a span' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

_(finished_spans.size).must_equal(1)

span = finished_spans.last
_(span.kind).must_equal(:producer)
end

it 'names the created span' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

span = finished_spans.last
_(span.name).must_equal('TestJobAsync publish')
end

it 'links spans together' do
bulk_jobs = Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

bulk_jobs.each { |job| Que.run_job_middleware(job) { job.tap(&:_run) } }

_(finished_spans.size).must_equal(11)

publish_span = finished_spans.first

process_spans = finished_spans.drop(1)

process_spans.each do |process_span|
_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end
end

it 'records attributes' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

attributes = finished_spans.last.attributes
_(attributes['messaging.system']).must_equal('que')
_(attributes['messaging.destination']).must_equal('default')
_(attributes['messaging.destination_kind']).must_equal('queue')
_(attributes['messaging.operation']).must_equal('publish')
_(attributes['messaging.que.job_class']).must_equal('TestJobAsync')
_(attributes['messaging.que.priority']).must_equal(100)
_(attributes.key?('messaging.message_id')).must_equal(false)
end
end

describe 'enqueueing zero jobs' do
it 'creates a span' do
Que.bulk_enqueue do
end

_(finished_spans.size).must_equal(1)
end
end

describe 'processing a job' do
before do
bulk_job = Que.bulk_enqueue do
Expand Down Expand Up @@ -363,10 +431,10 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

span1 = finished_spans.first
span1 = finished_spans.last
_(span1.kind).must_equal(:producer)

span2 = finished_spans.last
span2 = finished_spans.first
_(span2.kind).must_equal(:consumer)
end

Expand All @@ -375,10 +443,10 @@ def self.run(first, second); end
TestJobSync.enqueue
end

span1 = finished_spans.first
span1 = finished_spans.last
_(span1.name).must_equal('TestJobSync publish')

span2 = finished_spans.last
span2 = finished_spans.first
_(span2.name).must_equal('TestJobSync process')
end

Expand All @@ -387,7 +455,7 @@ def self.run(first, second); end
TestJobSync.enqueue
end

attributes = finished_spans.last.attributes
attributes = finished_spans.first.attributes
_(attributes['messaging.system']).must_equal('que')
_(attributes['messaging.destination']).must_equal('default')
_(attributes['messaging.destination_kind']).must_equal('queue')
Expand Down
2 changes: 1 addition & 1 deletion propagator/vitess/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Or, if you use [bundler][bundler-home], include `opentelemetry-propagator-vitess
Configure your application to use this propagator with the Trilogy client instrumentation by setting the following [environment variable][envars]:

```console
OTEL_RUBY_INSTRUMENTATION_TRILOGY_PROPAGATOR=vitess
OTEL_RUBY_INSTRUMENTATION_TRILOGY_CONFIG_OPTS=propagator=vitess
```

## How can I get involved?
Expand Down

0 comments on commit 92596c3

Please sign in to comment.