Skip to content

Commit

Permalink
Oversampling Mitigation (#366)
Browse files Browse the repository at this point in the history
* implemented oversampling mitigation

* updated begin_subsegment docstring to include sampling parameter

* created separate API for adding subsegments without sampling

* Modified add_subsegment method to log warning for orphaned subsegments

* updated unit tests

* addressing feedback

* final design changes

* remove default namespace value

* minor fix
  • Loading branch information
carolabadeer authored Nov 10, 2022
1 parent 6e30483 commit 6b64982
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 17 deletions.
2 changes: 1 addition & 1 deletion aws_xray_sdk/core/models/dummy_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class DummySegment(Segment):
the segment based on sampling rules.
Adding data to a dummy segment becomes a no-op except for
subsegments. This is to reduce the memory footprint of the SDK.
A dummy segment will not be sent to the X-Ray daemon. Manually create
A dummy segment will not be sent to the X-Ray daemon. Manually creating
dummy segments is not recommended.
"""

Expand Down
4 changes: 4 additions & 0 deletions aws_xray_sdk/core/models/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def add_subsegment(self, subsegment):
"""
self._check_ended()
subsegment.parent_id = self.id

if not self.sampled and subsegment.sampled:
log.warning("This sampled subsegment is being added to an unsampled parent segment/subsegment and will be orphaned.")

self.subsegments.append(subsegment)

def remove_subsegment(self, subsegment):
Expand Down
49 changes: 35 additions & 14 deletions aws_xray_sdk/core/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,10 @@ def current_segment(self):
else:
return entity

def begin_subsegment(self, name, namespace='local'):
"""
Begin a new subsegment.
If there is open subsegment, the newly created subsegment will be the
child of latest opened subsegment.
If not, it will be the child of the current open segment.
:param str name: the name of the subsegment.
:param str namespace: currently can only be 'local', 'remote', 'aws'.
"""
def _begin_subsegment_helper(self, name, namespace='local', beginWithoutSampling=False):
'''
Helper method to begin_subsegment and begin_subsegment_without_sampling
'''
# Generating the parent dummy segment is necessary.
# We don't need to store anything in context. Assumption here
# is that we only work with recorder-level APIs.
Expand All @@ -295,16 +289,42 @@ def begin_subsegment(self, name, namespace='local'):
if not segment:
log.warning("No segment found, cannot begin subsegment %s." % name)
return None

if not segment.sampled:

current_entity = self.get_trace_entity()
if not current_entity.sampled or beginWithoutSampling:
subsegment = DummySubsegment(segment, name)
else:
subsegment = Subsegment(name, namespace, segment)

self.context.put_subsegment(subsegment)

return subsegment



def begin_subsegment(self, name, namespace='local'):
"""
Begin a new subsegment.
If there is open subsegment, the newly created subsegment will be the
child of latest opened subsegment.
If not, it will be the child of the current open segment.
:param str name: the name of the subsegment.
:param str namespace: currently can only be 'local', 'remote', 'aws'.
"""
return self._begin_subsegment_helper(name, namespace)


def begin_subsegment_without_sampling(self, name):
"""
Begin a new unsampled subsegment.
If there is open subsegment, the newly created subsegment will be the
child of latest opened subsegment.
If not, it will be the child of the current open segment.
:param str name: the name of the subsegment.
"""
return self._begin_subsegment_helper(name, beginWithoutSampling=True)

def current_subsegment(self):
"""
Return the latest opened subsegment. In a multithreading environment,
Expand Down Expand Up @@ -487,7 +507,8 @@ def _send_segment(self):

def _stream_subsegment_out(self, subsegment):
log.debug("streaming subsegments...")
self.emitter.send_entity(subsegment)
if subsegment.sampled:
self.emitter.send_entity(subsegment)

def _load_sampling_rules(self, sampling_rules):

Expand Down
11 changes: 11 additions & 0 deletions aws_xray_sdk/core/utils/sqs_message_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SQS_XRAY_HEADER = "AWSTraceHeader"
class SqsMessageHelper:

@staticmethod
def isSampled(sqs_message):
attributes = sqs_message['attributes']

if SQS_XRAY_HEADER not in attributes:
return False

return 'Sampled=1' in attributes[SQS_XRAY_HEADER]
1 change: 0 additions & 1 deletion aws_xray_sdk/ext/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def inject_trace_header(headers, entity):
else:
header = entity.get_origin_trace_header()
data = header.data if header else None

to_insert = TraceHeader(
root=entity.trace_id,
parent=entity.id,
Expand Down
15 changes: 15 additions & 0 deletions tests/test_facade_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,18 @@ def test_structure_intact():

assert segment.subsegments[0] is subsegment
assert subsegment.subsegments[0] is subsegment2

def test_adding_unsampled_subsegment():

segment = FacadeSegment('name', 'id', 'id', True)
subsegment = Subsegment('sampled', 'local', segment)
subsegment2 = Subsegment('unsampled', 'local', segment)
subsegment2.sampled = False

segment.add_subsegment(subsegment)
subsegment.add_subsegment(subsegment2)


assert segment.subsegments[0] is subsegment
assert subsegment.subsegments[0] is subsegment2
assert subsegment2.sampled == False
35 changes: 35 additions & 0 deletions tests/test_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,24 @@ def test_first_begin_segment_sampled():

assert segment.sampled

def test_unsampled_subsegment_of_sampled_parent():
xray_recorder = get_new_stubbed_recorder()
xray_recorder.configure(sampling=True)
segment = xray_recorder.begin_segment('name', sampling=True)
subsegment = xray_recorder.begin_subsegment_without_sampling('unsampled')

assert segment.sampled == True
assert subsegment.sampled == False

def test_begin_subsegment_unsampled():
xray_recorder = get_new_stubbed_recorder()
xray_recorder.configure(sampling=False)
segment = xray_recorder.begin_segment('name', sampling=False)
subsegment = xray_recorder.begin_subsegment_without_sampling('unsampled')

assert segment.sampled == False
assert subsegment.sampled == False


def test_in_segment_closing():
xray_recorder = get_new_stubbed_recorder()
Expand Down Expand Up @@ -201,6 +219,23 @@ def test_disable_is_dummy():
assert type(xray_recorder.current_segment()) is DummySegment
assert type(xray_recorder.current_subsegment()) is DummySubsegment

def test_unsampled_subsegment_is_dummy():
assert global_sdk_config.sdk_enabled()
segment = xray_recorder.begin_segment('name')
subsegment = xray_recorder.begin_subsegment_without_sampling('name')

assert type(xray_recorder.current_subsegment()) is DummySubsegment

def test_subsegment_respects_parent_sampling_decision():
assert global_sdk_config.sdk_enabled()
segment = xray_recorder.begin_segment('name')
subsegment = xray_recorder.begin_subsegment_without_sampling('name2')
subsegment2 = xray_recorder.begin_subsegment('unsampled-subsegment')

assert type(xray_recorder.current_subsegment()) is DummySubsegment
assert subsegment.sampled == False
assert subsegment2.sampled == False


def test_disabled_empty_context_current_calls():
global_sdk_config.set_sdk_enabled(False)
Expand Down
68 changes: 68 additions & 0 deletions tests/test_sqs_message_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from aws_xray_sdk.core.utils.sqs_message_helper import SqsMessageHelper

import pytest

sampleSqsMessageEvent = {
"Records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185",
"AWSTraceHeader":"Root=1-632BB806-bd862e3fe1be46a994272793;Sampled=1"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
},
{
"messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082650636",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082650649",
"AWSTraceHeader":"Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=0"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
},
{
"messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082650636",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082650649",
"AWSTraceHeader":"Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
}
]
}

def test_return_true_when_sampling_1():
assert SqsMessageHelper.isSampled(sampleSqsMessageEvent['Records'][0]) == True

def test_return_false_when_sampling_0():
assert SqsMessageHelper.isSampled(sampleSqsMessageEvent['Records'][1]) == False

def test_return_false_with_no_sampling_flag():
assert SqsMessageHelper.isSampled(sampleSqsMessageEvent['Records'][2]) == False
19 changes: 19 additions & 0 deletions tests/test_trace_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from aws_xray_sdk.core.exceptions.exceptions import AlreadyEndedException

from .util import entity_to_dict
from .util import get_new_stubbed_recorder

xray_recorder = get_new_stubbed_recorder()


def test_unicode_entity_name():
Expand Down Expand Up @@ -263,3 +266,19 @@ def test_add_exception_appending_exceptions():

assert isinstance(segment.cause, dict)
assert len(segment.cause['exceptions']) == 2

def test_adding_subsegments_with_recorder():
xray_recorder.configure(sampling=False)
xray_recorder.clear_trace_entities()

segment = xray_recorder.begin_segment('parent');
subsegment = xray_recorder.begin_subsegment('sampled-child')
unsampled_subsegment = xray_recorder.begin_subsegment_without_sampling('unsampled-child1')
unsampled_child_subsegment = xray_recorder.begin_subsegment('unsampled-child2')

assert segment.sampled == True
assert subsegment.sampled == True
assert unsampled_subsegment.sampled == False
assert unsampled_child_subsegment.sampled == False

xray_recorder.clear_trace_entities()
43 changes: 42 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from aws_xray_sdk.ext.util import to_snake_case, get_hostname, strip_url
from aws_xray_sdk.ext.util import to_snake_case, get_hostname, strip_url, inject_trace_header
from aws_xray_sdk.core.models.segment import Segment
from aws_xray_sdk.core.models.subsegment import Subsegment
from aws_xray_sdk.core.models.dummy_entities import DummySegment, DummySubsegment
from .util import get_new_stubbed_recorder

xray_recorder = get_new_stubbed_recorder()

UNKNOWN_HOST = "UNKNOWN HOST"

Expand Down Expand Up @@ -52,3 +58,38 @@ def test_strip_url():

assert strip_url("") == ""
assert not strip_url(None)


def test_inject_trace_header_unsampled():
headers = {'host': 'test', 'accept': '*/*', 'connection': 'keep-alive', 'X-Amzn-Trace-Id': 'Root=1-6369739a-7d8bb07e519b795eb24d382d;Parent=089e3de743fb9e79;Sampled=1'}
xray_recorder = get_new_stubbed_recorder()
xray_recorder.configure(sampling=True)
segment = xray_recorder.begin_segment('name', sampling=True)
subsegment = xray_recorder.begin_subsegment_without_sampling('unsampled')

inject_trace_header(headers, subsegment)

assert 'Sampled=0' in headers['X-Amzn-Trace-Id']

def test_inject_trace_header_respects_parent_subsegment():
headers = {'host': 'test', 'accept': '*/*', 'connection': 'keep-alive', 'X-Amzn-Trace-Id': 'Root=1-6369739a-7d8bb07e519b795eb24d382d;Parent=089e3de743fb9e79;Sampled=1'}

xray_recorder = get_new_stubbed_recorder()
xray_recorder.configure(sampling=True)
segment = xray_recorder.begin_segment('name', sampling=True)
subsegment = xray_recorder.begin_subsegment_without_sampling('unsampled')
subsegment2 = xray_recorder.begin_subsegment('unsampled2')
inject_trace_header(headers, subsegment2)

assert 'Sampled=0' in headers['X-Amzn-Trace-Id']

def test_inject_trace_header_sampled():
headers = {'host': 'test', 'accept': '*/*', 'connection': 'keep-alive', 'X-Amzn-Trace-Id': 'Root=1-6369739a-7d8bb07e519b795eb24d382d;Parent=089e3de743fb9e79;Sampled=1'}
xray_recorder = get_new_stubbed_recorder()
xray_recorder.configure(sampling=True)
segment = xray_recorder.begin_segment('name')
subsegment = xray_recorder.begin_subsegment('unsampled')

inject_trace_header(headers, subsegment)

assert 'Sampled=1' in headers['X-Amzn-Trace-Id']

0 comments on commit 6b64982

Please sign in to comment.