diff --git a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/handler.rb b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/handler.rb index a973adeb5..c0b040956 100644 --- a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/handler.rb +++ b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/handler.rb @@ -9,36 +9,32 @@ module Instrumentation module AwsSdk # Generates Spans for all interactions with AwsSdk class Handler < Seahorse::Client::Handler - SQS_SEND_MESSAGE = 'SQS.SendMessage' - SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch' - SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage' - SNS_PUBLISH = 'SNS.Publish' - def call(context) return super unless context - service_name = service_name(context) + service_id = service_name(context) operation = context.operation&.name - client_method = "#{service_name}.#{operation}" - attributes = { - 'aws.region' => context.config.region, - OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api', - OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation, - OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_name - } - attributes[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_name == 'DynamoDB' - MessagingHelper.apply_sqs_attributes(attributes, context, client_method) if service_name == 'SQS' - MessagingHelper.apply_sns_attributes(attributes, context, client_method) if service_name == 'SNS' + client_method = "#{service_id}.#{operation}" + + tracer.in_span( + span_name(context, client_method, service_id), + attributes: attributes(context, client_method, service_id, operation), + kind: span_kind(client_method, service_id) + ) do |span| + if instrumentation_config[:inject_messaging_context] && + %w[SQS SNS].include?(service_id) + MessagingHelper.inject_context(context, client_method) + end - tracer.in_span(span_name(context, client_method), attributes: attributes, kind: span_kind(client_method)) do |span| - inject_context(context, client_method) if instrumentation_config[:suppress_internal_instrumentation] OpenTelemetry::Common::Utilities.untraced { super } else super end.tap do |response| - span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE, - context.http_response.status_code) + span.set_attribute( + OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE, + context.http_response.status_code + ) if (err = response.error) span.record_exception(err) @@ -65,48 +61,40 @@ def service_name(context) context.client.class.api.metadata['serviceId'] || context.client.class.to_s.split('::')[1] end - SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze - def inject_context(context, client_method) - return unless SEND_MESSAGE_CLIENT_METHODS.include? client_method - return unless instrumentation_config[:inject_messaging_context] - - if client_method == SQS_SEND_MESSAGE_BATCH - context.params[:entries].each do |entry| - entry[:message_attributes] ||= {} - OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter) - end + def span_kind(client_method, service_id) + case service_id + when 'SQS', 'SNS' + MessagingHelper.span_kind(client_method) else - context.params[:message_attributes] ||= {} - OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter) + OpenTelemetry::Trace::SpanKind::CLIENT end end - def span_kind(client_method) - case client_method - when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH - OpenTelemetry::Trace::SpanKind::PRODUCER - when SQS_RECEIVE_MESSAGE - OpenTelemetry::Trace::SpanKind::CONSUMER + def span_name(context, client_method, service_id) + case service_id + when 'SQS', 'SNS' + MessagingHelper.legacy_span_name(context, client_method) else - OpenTelemetry::Trace::SpanKind::CLIENT + client_method end end - def span_name(context, client_method) - case client_method - when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH - "#{MessagingHelper.queue_name(context)} publish" - when SQS_RECEIVE_MESSAGE - "#{MessagingHelper.queue_name(context)} receive" - else - client_method + def attributes(context, client_method, service_id, operation) + { + 'aws.region' => context.config.region, + OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api', + OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation, + OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_id + }.tap do |attrs| + attrs[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_id == 'DynamoDB' + MessagingHelper.apply_span_attributes(context, attrs, client_method, service_id) if %w[SQS SNS].include?(service_id) end end end # A Seahorse::Client::Plugin that enables instrumentation for all AWS services class Plugin < Seahorse::Client::Plugin - def add_handlers(handlers, config) + def add_handlers(handlers, _config) # run before Seahorse::Client::Plugin::ParamValidator (priority 50) handlers.add Handler, step: :validate, priority: 49 end diff --git a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/instrumentation.rb b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/instrumentation.rb index 07b4075f2..928b9b98e 100644 --- a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/instrumentation.rb +++ b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/instrumentation.rb @@ -13,7 +13,7 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base install do |_config| require_dependencies - add_plugin(Seahorse::Client::Base, *loaded_constants) + add_plugins(Seahorse::Client::Base, *loaded_service_clients) end present do @@ -41,31 +41,39 @@ def gem_version def require_dependencies require_relative 'handler' - require_relative 'services' require_relative 'message_attributes' require_relative 'messaging_helper' end - def add_plugin(*targets) + def add_plugins(*targets) targets.each { |klass| klass.add_plugin(AwsSdk::Plugin) } end - def loaded_constants - # Cross-check services against loaded AWS constants - # Module#const_get can return a constant from ancestors when there's a miss. - # If this conincidentally matches another constant, it will attempt to patch - # the wrong constant, resulting in patch failure. - available_services = ::Aws.constants & SERVICES.map(&:to_sym) - available_services.each_with_object([]) do |service, constants| - next if ::Aws.autoload?(service) + def loaded_service_clients + ::Aws.constants.each_with_object([]) do |c, constants| + m = ::Aws.const_get(c) + next unless loaded_service?(c, m) begin - constants << ::Aws.const_get(service, false).const_get(:Client, false) + constants << m.const_get(:Client) rescue StandardError => e OpenTelemetry.logger.warn("Constant could not be loaded: #{e}") end end end + + # This check does the following: + # 1 - Checks if the service client is autoload or not + # 2 - Validates whether if is a service client + # note that Seahorse::Client::Base is a superclass for V3 clients + # but for V2, it is Aws::Client + def loaded_service?(constant, service_module) + !::Aws.autoload?(constant) && + service_module.is_a?(Module) && + service_module.const_defined?(:Client) && + (service_module.const_get(:Client).superclass == Seahorse::Client::Base || + service_module.const_get(:Client).superclass == Aws::Client) + end end end end diff --git a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/messaging_helper.rb b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/messaging_helper.rb index e6fb8d0e4..27064aa74 100644 --- a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/messaging_helper.rb +++ b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/messaging_helper.rb @@ -7,9 +7,15 @@ module OpenTelemetry module Instrumentation module AwsSdk - # MessagingHelper class provides methods for calculating messaging span attributes + # An utility class to help SQS/SNS-related span attributes/context injection class MessagingHelper class << self + SQS_SEND_MESSAGE = 'SQS.SendMessage' + SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch' + SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage' + SNS_PUBLISH = 'SNS.Publish' + SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze + def queue_name(context) topic_arn = context.params[:topic_arn] target_arn = context.params[:target_arn] @@ -28,19 +34,64 @@ def queue_name(context) 'unknown' end + def legacy_span_name(context, client_method) + case client_method + when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH + "#{MessagingHelper.queue_name(context)} publish" + when SQS_RECEIVE_MESSAGE + "#{MessagingHelper.queue_name(context)} receive" + else + client_method + end + end + + def apply_span_attributes(context, attrs, client_method, service_id) + case service_id + when 'SQS' + apply_sqs_attributes(attrs, context, client_method) + when 'SNS' + apply_sns_attributes(attrs, context, client_method) + end + end + + def span_kind(client_method) + case client_method + when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH + OpenTelemetry::Trace::SpanKind::PRODUCER + when SQS_RECEIVE_MESSAGE + OpenTelemetry::Trace::SpanKind::CONSUMER + else + OpenTelemetry::Trace::SpanKind::CLIENT + end + end + + def inject_context(context, client_method) + return unless SEND_MESSAGE_CLIENT_METHODS.include?(client_method) + + if client_method == SQS_SEND_MESSAGE_BATCH + context.params[:entries].each do |entry| + entry[:message_attributes] ||= {} + OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter) + end + else + context.params[:message_attributes] ||= {} + OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter) + end + end + + private + def apply_sqs_attributes(attributes, context, client_method) attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sqs' attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'queue' attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context) attributes[SemanticConventions::Trace::MESSAGING_URL] = context.params[:queue_url] if context.params[:queue_url] - - attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == 'SQS.ReceiveMessage' + attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == SQS_RECEIVE_MESSAGE end def apply_sns_attributes(attributes, context, client_method) attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sns' - - return unless client_method == 'SNS.Publish' + return unless client_method == SNS_PUBLISH attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'topic' attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context) diff --git a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/services.rb b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/services.rb deleted file mode 100644 index d0bacb75f..000000000 --- a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/services.rb +++ /dev/null @@ -1,121 +0,0 @@ -# frozen_string_literal: true - -module OpenTelemetry - module Instrumentation - # rubocop:disable Metrics/ModuleLength: - module AwsSdk - SERVICES = %w[ - ACM - APIGateway - AppStream - ApplicationAutoScaling - ApplicationDiscoveryService - Athena - AutoScaling - Batch - Budgets - CloudDirectory - CloudFormation - CloudFront - CloudHSM - CloudHSMV2 - CloudSearch - CloudSearchDomain - CloudTrail - CloudWatch - CloudWatchEvents - CloudWatchLogs - CodeBuild - CodeCommit - CodeDeploy - CodePipeline - CodeStar - CognitoIdentity - CognitoIdentityProvider - CognitoSync - ConfigService - CostandUsageReportService - DAX - DataPipeline - DatabaseMigrationService - DeviceFarm - DirectConnect - DirectoryService - DynamoDB - DynamoDBStreams - EC2 - ECR - ECS - EFS - EMR - ElastiCache - ElasticBeanstalk - ElasticLoadBalancing - ElasticLoadBalancingV2 - ElasticTranscoder - ElasticsearchService - EventBridge - Firehose - GameLift - Glacier - Glue - Greengrass - Health - IAM - ImportExport - Inspector - IoT - IoTDataPlane - KMS - Kinesis - KinesisAnalytics - Lambda - LambdaPreview - Lex - LexModelBuildingService - Lightsail - MTurk - MachineLearning - MarketplaceCommerceAnalytics - MarketplaceEntitlementService - MarketplaceMetering - MigrationHub - Mobile - OpsWorks - OpsWorksCM - Organizations - Pinpoint - Polly - RDS - Redshift - Rekognition - ResourceGroupsTaggingAPI - Route53 - Route53Domains - S3 - SES - SMS - SNS - SQS - SSM - STS - SWF - ServiceCatalog - Schemas - Shield - SimpleDB - Snowball - States - StorageGateway - Support - Textract - WAF - WAFRegional - WorkDocs - WorkSpaces - XRay - ].freeze - end - # rubocop:enable Metrics/ModuleLength: - end -end diff --git a/instrumentation/aws_sdk/opentelemetry-instrumentation-aws_sdk.gemspec b/instrumentation/aws_sdk/opentelemetry-instrumentation-aws_sdk.gemspec index 0c84297d7..6562625c6 100644 --- a/instrumentation/aws_sdk/opentelemetry-instrumentation-aws_sdk.gemspec +++ b/instrumentation/aws_sdk/opentelemetry-instrumentation-aws_sdk.gemspec @@ -19,9 +19,10 @@ Gem::Specification.new do |spec| spec.homepage = 'https://github.com/open-telemetry/opentelemetry-ruby-contrib' spec.license = 'Apache-2.0' - spec.files = Dir.glob('lib/**/*.rb') + - Dir.glob('*.md') + - ['LICENSE', '.yardopts'] + spec.files = + Dir.glob('lib/**/*.rb') + + Dir.glob('*.md') + + ['LICENSE', '.yardopts'] spec.require_paths = ['lib'] spec.required_ruby_version = '>= 3.0'