Skip to content

Commit

Permalink
Merge branch '3.x-staging' into munir/remove-deprecated-code-integrat…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
mabdinur authored Jan 31, 2025
2 parents cc81d15 + 1779b19 commit bcf8cba
Show file tree
Hide file tree
Showing 204 changed files with 1,885 additions and 12,087 deletions.
56 changes: 54 additions & 2 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,14 @@ def _get_parameters_for_new_span_directly_from_context(ctx: core.ExecutionContex
def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -> "Span":
span_kwargs = _get_parameters_for_new_span_directly_from_context(ctx)
call_trace = ctx.get_item("call_trace", call_trace)
tracer = (ctx.get_item("middleware") or ctx["pin"]).tracer
tracer = ctx.get_item("tracer") or (ctx.get_item("middleware") or ctx["pin"]).tracer
distributed_headers_config = ctx.get_item("distributed_headers_config")
if distributed_headers_config:
trace_utils.activate_distributed_headers(
tracer, int_config=distributed_headers_config, request_headers=ctx["distributed_headers"]
tracer,
int_config=distributed_headers_config,
request_headers=ctx["distributed_headers"],
override=ctx.get_item("distributed_headers_config_override"),
)
distributed_context = ctx.get_item("distributed_context")
if distributed_context and not call_trace:
Expand All @@ -126,6 +129,42 @@ def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -
return span


def _set_web_frameworks_tags(ctx, span, int_config):
span.set_tag_str(COMPONENT, int_config.integration_name)
span.set_tag_str(SPAN_KIND, SpanKind.SERVER)
span.set_tag(_SPAN_MEASURED_KEY)

analytics_enabled = ctx.get_item("analytics_enabled")
analytics_sample_rate = ctx.get_item("analytics_sample_rate", True)

# Configure trace search sample rate
if (config._analytics_enabled and analytics_enabled is not False) or analytics_enabled is True:
span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, analytics_sample_rate)


def _on_web_framework_start_request(ctx, int_config):
request_span = ctx.get_item("req_span")
_set_web_frameworks_tags(ctx, request_span, int_config)


def _on_web_framework_finish_request(
span, int_config, method, url, status_code, query, req_headers, res_headers, route, finish
):
trace_utils.set_http_meta(
span=span,
integration_config=int_config,
method=method,
url=url,
status_code=status_code,
query=query,
request_headers=req_headers,
response_headers=res_headers,
route=route,
)
if finish:
span.finish()


def _on_traced_request_context_started_flask(ctx):
current_span = ctx["pin"].tracer.current_span()
if not ctx["pin"].enabled or not current_span:
Expand Down Expand Up @@ -761,6 +800,10 @@ def listen():
core.on("azure.functions.request_call_modifier", _on_azure_functions_request_span_modifier)
core.on("azure.functions.start_response", _on_azure_functions_start_response)

# web frameworks general handlers
core.on("web.request.start", _on_web_framework_start_request)
core.on("web.request.finish", _on_web_framework_finish_request)

core.on("test_visibility.enable", _on_test_visibility_enable)
core.on("test_visibility.disable", _on_test_visibility_disable)
core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled")
Expand All @@ -769,6 +812,14 @@ def listen():
core.on("rq.queue.enqueue_job", _propagate_context)

for context_name in (
# web frameworks
"aiohttp.request",
"bottle.request",
"cherrypy.request",
"falcon.request",
"molten.request",
"pyramid.request",
"sanic.request",
"flask.call",
"flask.jsonify",
"flask.render_template",
Expand All @@ -779,6 +830,7 @@ def listen():
"django.template.render",
"django.process_exception",
"django.func.wrapped",
# non web frameworks
"botocore.instrumented_api_call",
"botocore.instrumented_lib_function",
"botocore.patched_kinesis_api_call",
Expand Down
1 change: 1 addition & 0 deletions ddtrace/appsec/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class APPSEC(metaclass=Constant_Class):
AUTO_LOGIN_EVENTS_FAILURE_MODE: Literal[
"_dd.appsec.events.users.login.failure.auto.mode"
] = "_dd.appsec.events.users.login.failure.auto.mode"
AUTO_LOGIN_EVENTS_COLLECTION_MODE: Literal["_dd.appsec.user.collection_mode"] = "_dd.appsec.user.collection_mode"
BLOCKED: Literal["appsec.blocked"] = "appsec.blocked"
EVENT: Literal["appsec.event"] = "appsec.event"
AUTO_USER_INSTRUMENTATION_MODE: Literal[
Expand Down
10 changes: 8 additions & 2 deletions ddtrace/appsec/_iast/_evidence_redaction/_sensitive_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

from .._utils import _get_source_index
from ..constants import VULN_CMDI
from ..constants import VULN_CODE_INJECTION
from ..constants import VULN_HEADER_INJECTION
from ..constants import VULN_SQL_INJECTION
from ..constants import VULN_SSRF
from .command_injection_sensitive_analyzer import command_injection_sensitive_analyzer
from .default_sensitive_analyzer import default_sensitive_analyzer
from .header_injection_sensitive_analyzer import header_injection_sensitive_analyzer
from .sql_sensitive_analyzer import sql_sensitive_analyzer
from .url_sensitive_analyzer import url_sensitive_analyzer
Expand All @@ -19,6 +21,7 @@

REDACTED_SOURCE_BUFFER = string.ascii_letters + string.digits
LEN_SOURCE_BUFFER = len(REDACTED_SOURCE_BUFFER)
VALUE_MAX_LENGHT = 45


def get_redacted_source(length):
Expand All @@ -42,6 +45,7 @@ def __init__(self):
VULN_SQL_INJECTION: sql_sensitive_analyzer,
VULN_SSRF: url_sensitive_analyzer,
VULN_HEADER_INJECTION: header_injection_sensitive_analyzer,
VULN_CODE_INJECTION: default_sensitive_analyzer,
}

@staticmethod
Expand Down Expand Up @@ -288,7 +292,7 @@ def to_redacted_json(self, evidence_value, sensitive, tainted_ranges, sources):
return {"redacted_value_parts": value_parts, "redacted_sources": redacted_sources}

def redact_source(self, sources, redacted_sources, redacted_sources_context, source_index, start, end):
if source_index is not None:
if source_index is not None and source_index < len(sources):
if not sources[source_index].redacted:
redacted_sources.append(source_index)
sources[source_index].pattern = get_redacted_source(len(sources[source_index].value))
Expand All @@ -303,8 +307,10 @@ def write_value_part(self, value_parts, value, source_index=None):
if value:
if source_index is not None:
value_parts.append({"value": value, "source": source_index})
else:
elif len(value) < VALUE_MAX_LENGHT:
value_parts.append({"value": value})
else:
value_parts.append({"redacted": True})

def write_redacted_value_part(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)


def default_sensitive_analyzer(evidence, name_pattern, value_pattern):
if name_pattern.search(evidence.value) or value_pattern.search(evidence.value):
return [{"start": 0, "end": len(evidence.value)}]

return []
1 change: 1 addition & 0 deletions ddtrace/appsec/_iast/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def _on_django_patch():
functools.partial(if_iast_taint_returned_object_for, OriginType.PARAMETER),
)
)

# we instrument those sources on _on_django_func_wrapped
_set_metric_iast_instrumented_source(OriginType.HEADER_NAME)
_set_metric_iast_instrumented_source(OriginType.HEADER)
Expand Down
57 changes: 55 additions & 2 deletions ddtrace/appsec/_trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ def track_user_login_success_event(
return
if real_mode == LOGIN_EVENTS_MODE.ANON and isinstance(user_id, str):
user_id = _hash_user_id(user_id)

span.set_tag_str(APPSEC.AUTO_LOGIN_EVENTS_COLLECTION_MODE, real_mode)
if login_events_mode != LOGIN_EVENTS_MODE.SDK:
span.set_tag_str(APPSEC.USER_LOGIN_USERID, str(user_id))
set_user(tracer, user_id, name, email, scope, role, session_id, propagate, span)
set_user(tracer, user_id, name, email, scope, role, session_id, propagate, span, may_block=False)
if in_asm_context():
res = call_waf_callback(
custom_data={
Expand Down Expand Up @@ -185,6 +185,7 @@ def track_user_login_failure_event(
if login_events_mode != LOGIN_EVENTS_MODE.SDK:
span.set_tag_str(APPSEC.USER_LOGIN_USERID, str(user_id))
span.set_tag_str("%s.failure.%s" % (APPSEC.USER_LOGIN_EVENT_PREFIX_PUBLIC, user.ID), str(user_id))
span.set_tag_str(APPSEC.AUTO_LOGIN_EVENTS_COLLECTION_MODE, real_mode)
# if called from the SDK, set the login, email and name
if login_events_mode in (LOGIN_EVENTS_MODE.SDK, LOGIN_EVENTS_MODE.AUTO):
if login:
Expand Down Expand Up @@ -376,5 +377,57 @@ def _on_django_auth(result_user, mode, kwargs, pin, info_retriever, django_confi
return False, None


def _on_django_process(result_user, mode, kwargs, pin, info_retriever, django_config):
if (not asm_config._asm_enabled) or mode == LOGIN_EVENTS_MODE.DISABLED:
return
userid_list = info_retriever.possible_user_id_fields + info_retriever.possible_login_fields

for possible_key in userid_list:
if possible_key in kwargs:
user_id = kwargs[possible_key]
break
else:
user_id = None

user_id_found, user_extra = info_retriever.get_user_info(
login=django_config.include_user_login,
email=django_config.include_user_email,
name=django_config.include_user_realname,
)
if user_extra.get("login") is None:
user_extra["login"] = user_id
user_id = user_id_found or user_id
if result_user and result_user.is_authenticated:
span = pin.tracer.current_root_span()
if mode == LOGIN_EVENTS_MODE.ANON and isinstance(user_id, str):
hash_id = _hash_user_id(user_id)
span.set_tag_str(APPSEC.USER_LOGIN_USERID, hash_id)
span.set_tag_str(APPSEC.AUTO_LOGIN_EVENTS_COLLECTION_MODE, mode)
set_user(pin.tracer, hash_id, propagate=True, may_block=False, span=span)
elif mode == LOGIN_EVENTS_MODE.IDENT:
span.set_tag_str(APPSEC.USER_LOGIN_USERID, str(user_id))
span.set_tag_str(APPSEC.AUTO_LOGIN_EVENTS_COLLECTION_MODE, mode)
set_user(
pin.tracer,
str(user_id),
propagate=True,
email=user_extra.get("email"),
name=user_extra.get("name"),
may_block=False,
span=span,
)
if in_asm_context():
real_mode = mode if mode != LOGIN_EVENTS_MODE.AUTO else asm_config._user_event_mode
custom_data = {
"REQUEST_USER_ID": str(user_id) if user_id else None,
"REQUEST_USERNAME": user_extra.get("login"),
"LOGIN_SUCCESS": real_mode,
}
res = call_waf_callback(custom_data=custom_data, force_sent=True)
if res and any(action in [WAF_ACTIONS.BLOCK_ACTION, WAF_ACTIONS.REDIRECT_ACTION] for action in res.actions):
raise BlockingException(get_blocked())


core.on("django.login", _on_django_login)
core.on("django.auth", _on_django_auth, "user")
core.on("django.process_request", _on_django_process)
2 changes: 2 additions & 0 deletions ddtrace/appsec/trace_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Public API for User events"""

from ddtrace.appsec._trace_utils import block_request # noqa: F401
from ddtrace.appsec._trace_utils import block_request_if_user_blocked # noqa: F401
from ddtrace.appsec._trace_utils import should_block_user # noqa: F401
Expand Down
105 changes: 49 additions & 56 deletions ddtrace/contrib/internal/aiohttp/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@
from aiohttp.web_urldispatcher import SystemRoute

from ddtrace import config
from ddtrace.constants import _ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.constants import _SPAN_MEASURED_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.contrib import trace_utils
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import http
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal import core
from ddtrace.internal.schema import schematize_url_operation
from ddtrace.internal.schema.span_attribute_schema import SpanDirection

Expand All @@ -34,47 +29,42 @@ async def attach_context(request):
# application configs
tracer = app[CONFIG_KEY]["tracer"]
service = app[CONFIG_KEY]["service"]
distributed_tracing = app[CONFIG_KEY]["distributed_tracing_enabled"]
# Create a new context based on the propagated information.
trace_utils.activate_distributed_headers(
tracer,
int_config=config.aiohttp,
request_headers=request.headers,
override=distributed_tracing,
)

# trace the handler
request_span = tracer.trace(
schematize_url_operation("aiohttp.request", protocol="http", direction=SpanDirection.INBOUND),
service=service,
span_type=SpanTypes.WEB,
)
request_span.set_tag(_SPAN_MEASURED_KEY)

request_span.set_tag_str(COMPONENT, config.aiohttp.integration_name)

# set span.kind tag equal to type of request
request_span.set_tag_str(SPAN_KIND, SpanKind.SERVER)

# Configure trace search sample rate
# DEV: aiohttp is special case maintains separate configuration from config api
analytics_enabled = app[CONFIG_KEY]["analytics_enabled"]
if (config._analytics_enabled and analytics_enabled is not False) or analytics_enabled is True:
request_span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, app[CONFIG_KEY].get("analytics_sample_rate", True))

# attach the context and the root span to the request; the Context
# may be freely used by the application code
request[REQUEST_CONTEXT_KEY] = request_span.context
request[REQUEST_SPAN_KEY] = request_span
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
return response
except Exception:
request_span.set_traceback()
raise
# Create a new context based on the propagated information.

with core.context_with_data(
"aiohttp.request",
span_name=schematize_url_operation("aiohttp.request", protocol="http", direction=SpanDirection.INBOUND),
span_type=SpanTypes.WEB,
service=service,
tags={},
tracer=tracer,
distributed_headers=request.headers,
distributed_headers_config=config.aiohttp,
distributed_headers_config_override=app[CONFIG_KEY]["distributed_tracing_enabled"],
headers_case_sensitive=True,
analytics_enabled=analytics_enabled,
analytics_sample_rate=app[CONFIG_KEY].get("analytics_sample_rate", True),
) as ctx:
req_span = ctx.span

ctx.set_item("req_span", req_span)
core.dispatch("web.request.start", (ctx, config.aiohttp))

# attach the context and the root span to the request; the Context
# may be freely used by the application code
request[REQUEST_CONTEXT_KEY] = req_span.context
request[REQUEST_SPAN_KEY] = req_span
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
return response
except Exception:
req_span.set_traceback()
raise

return attach_context

Expand Down Expand Up @@ -121,19 +111,22 @@ def finish_request_span(request, response):
# SystemRoute objects exist to throw HTTP errors and have no path
route = aiohttp_route.resource.canonical

trace_utils.set_http_meta(
request_span,
config.aiohttp,
method=request.method,
url=str(request.url), # DEV: request.url is a yarl's URL object
status_code=response.status,
request_headers=request.headers,
response_headers=response.headers,
route=route,
core.dispatch(
"web.request.finish",
(
request_span,
config.aiohttp,
request.method,
str(request.url), # DEV: request.url is a yarl's URL object
response.status,
None, # query arg = None
request.headers,
response.headers,
route,
True,
),
)

request_span.finish()


async def on_prepare(request, response):
"""
Expand Down
Loading

0 comments on commit bcf8cba

Please sign in to comment.