Skip to content
18 changes: 10 additions & 8 deletions ddtrace/internal/coverage/instrumentation_py3_12.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _instrument_all_lines_with_monitoring(
current_import_name: t.Optional[str] = None
current_import_package: t.Optional[str] = None

line = 0
line: t.Optional[int] = None

ext: list[bytes] = []
code_iter = iter(enumerate(code.co_code))
Expand All @@ -89,12 +89,14 @@ def _instrument_all_lines_with_monitoring(

if offset in linestarts:
line = linestarts[offset]
lines.add(line)
# Skip if line is None (bytecode that doesn't map to a specific source line)
if line is not None:
lines.add(line)

# Make sure that the current module is marked as depending on its own package by instrumenting the
# first executable line
if code.co_name == "<module>" and len(lines) == 1 and package is not None:
import_names[line] = (package, ("",))
# Make sure that the current module is marked as depending on its own package by instrumenting the
# first executable line
if code.co_name == "<module>" and len(lines) == 1 and package is not None:
import_names[line] = (package, ("",))

if opcode is EXTENDED_ARG:
ext.append(arg)
Expand All @@ -105,7 +107,7 @@ def _instrument_all_lines_with_monitoring(
current_arg = int.from_bytes([*ext, arg], "big", signed=False)
ext.clear()

if opcode == IMPORT_NAME:
if opcode == IMPORT_NAME and line is not None:
import_depth: int = code.co_consts[_previous_previous_arg]
current_import_name: str = code.co_names[current_arg]
# Adjust package name if the import is relative and a parent (ie: if depth is more than 1)
Expand All @@ -124,7 +126,7 @@ def _instrument_all_lines_with_monitoring(
# Also track import from statements since it's possible that the "from" target is a module, eg:
# from my_package import my_module
# Since the package has not changed, we simply extend the previous import names with the new value
if opcode == IMPORT_FROM:
if opcode == IMPORT_FROM and line is not None:
import_from_name = f"{current_import_name}.{code.co_names[current_arg]}"
if line in import_names:
import_names[line] = (
Expand Down
113 changes: 113 additions & 0 deletions tests/coverage/included_path/synthetic_opcodes_module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
Module with various code patterns that generate code without line numbers.

Python 3.12+ may generate None line numbers for:
- Comprehensions
- Lambda functions
- Nested function definitions
- Generator expressions
"""


def test_comprehensions(items):
"""Test list/dict/set comprehensions which can generate synthetic opcodes."""
# List comprehension
doubled = [x * 2 for x in items]

# Dict comprehension
squared_dict = {x: x**2 for x in items} # noqa: F841

# Set comprehension
unique_doubled = {x * 2 for x in items} # noqa: F841

# Nested comprehension
nested = [[y * 2 for y in range(x)] for x in items[:3]] # noqa: F841

return doubled


def test_lambda(value):
"""Test lambda functions which may have synthetic opcodes."""
# Simple lambda
square = lambda x: x * x # noqa: E731

# Lambda with conditional
safe_divide = lambda x, y: x / y if y != 0 else 0 # noqa: E731, F841

# Lambda with complex expression
complex_calc = lambda x: (x + 1) * (x + 2) * (x + 3) if x > 0 else 0 # noqa: E731, F841

return square(value)


def test_nested_functions(value):
"""Test nested function definitions."""

def outer(x):
def inner(y):
def innermost(z):
return z * 3

return innermost(y) + x

return inner(x)

result = outer(value)
return result


def test_generator_expression():
"""Test generator expressions which may have synthetic opcodes."""
# Simple generator
gen = (x * 2 for x in range(10))

# Generator with filter
filtered_gen = (x * 2 for x in range(10) if x % 2 == 0) # noqa: F841

# Nested generator
nested_gen = ((x, y) for x in range(3) for y in range(3)) # noqa: F841

return list(gen)


def test_walrus_operator(items):
"""Test walrus operator (Python 3.8+) which may generate synthetic opcodes."""
results = []

# Walrus in comprehension
filtered = [y for x in items if (y := x * 2) > 5]

# Walrus in while loop
while (n := len(results)) < 5:
results.append(n)

return filtered


class TestClass:
"""Class with various methods that may generate synthetic opcodes."""

def __init__(self, value):
self.value = value
# Comprehension in __init__
self.doubled_range = [x * 2 for x in range(value)]

def method_with_lambda(self):
"""Method that uses lambda."""
transform = lambda x: x * self.value # noqa: E731, F841
return [i * self.value for i in range(5)]

@property
def computed_property(self):
"""Property with comprehension."""
return sum(x * 2 for x in range(self.value))


# Module-level comprehension (executed at import time)
MODULE_LEVEL_COMP = [x * 3 for x in range(5)]

# Module-level lambda
MODULE_LEVEL_LAMBDA = lambda x: x * 2 # noqa: E731

# Module-level generator
MODULE_LEVEL_GEN = (x for x in range(10))
164 changes: 164 additions & 0 deletions tests/coverage/test_coverage_py312_synthetic_opcodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""
Test coverage tracking for Python 3.12+ with opcodes not mapped to line numbers.

Python 3.12+ can generate opcodes with None line numbers in linestarts.
This happens for compiler-generated code like:
- Comprehensions
- Lambda functions
- Certain optimized bytecode patterns
- Decorator application
- Class definition helpers

Without proper None handling, this causes:
TypeError: unsupported operand type(s) for //: 'NoneType' and 'int'

This test ensures we handle these cases gracefully by checking line is not None.
"""

import sys

import pytest


@pytest.mark.skipif(sys.version_info < (3, 12), reason="Test specific to Python 3.12+ monitoring API")
@pytest.mark.subprocess(
env={
"DD_CIVISIBILITY_AGENTLESS_ENABLED": "1",
"DD_API_KEY": "foobar.baz",
}
)
def test_coverage_with_synthetic_opcodes():
"""Test that coverage tracking handles synthetic opcodes with None line numbers."""
import os
from pathlib import Path

from ddtrace.internal.coverage.code import ModuleCodeCollector
from ddtrace.internal.coverage.installer import install

cwd_path = os.getcwd()
include_path = Path(cwd_path + "/tests/coverage/included_path/")

install(include_paths=[include_path])

# Import a module that will exercise various code patterns that may generate synthetic opcodes
from tests.coverage.included_path.synthetic_opcodes_module import test_comprehensions
from tests.coverage.included_path.synthetic_opcodes_module import test_lambda
from tests.coverage.included_path.synthetic_opcodes_module import test_nested_functions

ModuleCodeCollector.start_coverage()

# Execute code that generates synthetic opcodes
result = test_comprehensions([1, 2, 3, 4, 5])
assert result == [2, 4, 6, 8, 10]

result = test_lambda(5)
assert result == 25

result = test_nested_functions(10)
assert result == 40 # outer(10) -> inner(10) -> innermost(10) * 3 + 10 = 30 + 10 = 40

ModuleCodeCollector.stop_coverage()

# Verify we got coverage data without crashing
lines = ModuleCodeCollector._instance.lines

# Check that we tracked the module (path may vary, just check it exists)
module_tracked = any("synthetic_opcodes_module.py" in path for path in lines.keys())
assert module_tracked, f"Module not tracked. Found paths: {list(lines.keys())}"

covered = ModuleCodeCollector._instance._get_covered_lines(include_imported=False)
module_covered = any("synthetic_opcodes_module.py" in path for path in covered.keys())
assert module_covered, f"Module not in covered. Found paths: {list(covered.keys())}"
# The important thing is that we didn't crash with TypeError


@pytest.mark.skipif(sys.version_info < (3, 12), reason="Test specific to Python 3.12+ monitoring API")
@pytest.mark.subprocess(
env={
"DD_CIVISIBILITY_AGENTLESS_ENABLED": "1",
"DD_API_KEY": "foobar.baz",
}
)
def test_coverage_with_complex_expressions():
"""Test coverage with complex expressions that may have None line numbers."""
import os
from pathlib import Path

from ddtrace.internal.coverage.code import ModuleCodeCollector
from ddtrace.internal.coverage.installer import install

cwd_path = os.getcwd()
include_path = Path(cwd_path + "/tests/coverage/included_path/")

install(include_paths=[include_path])

ModuleCodeCollector.start_coverage()

# Create complex expressions inline that might generate synthetic opcodes
# Dictionary comprehension
result = {k: v * 2 for k, v in {"a": 1, "b": 2, "c": 3}.items()}
assert result == {"a": 2, "b": 4, "c": 6}

# Set comprehension
result = {x * 2 for x in range(5)}
assert result == {0, 2, 4, 6, 8}

# Generator expression (consumed)
result = list(x * 2 for x in range(5))
assert result == [0, 2, 4, 6, 8]

# Nested comprehension
result = [[y * 2 for y in range(3)] for x in range(2)]
assert result == [[0, 2, 4], [0, 2, 4]]

# Lambda with complex expression
func = lambda x: (x + 1) * (x + 2) if x > 0 else 0 # noqa: E731
assert func(3) == 20

ModuleCodeCollector.stop_coverage()

# Verify we didn't crash
covered = ModuleCodeCollector._instance._get_covered_lines(include_imported=False)
assert isinstance(covered, dict)
# If we got here without TypeError, we successfully handled None line numbers


@pytest.mark.skipif(sys.version_info < (3, 12), reason="Test specific to Python 3.12+ monitoring API")
@pytest.mark.subprocess(
env={
"DD_CIVISIBILITY_AGENTLESS_ENABLED": "1",
"DD_API_KEY": "foobar.baz",
}
)
def test_coverage_import_with_comprehensions():
"""
Test that import-time comprehensions don't crash coverage.

When a module has comprehensions at module level, Python 3.12+ may generate
synthetic opcodes with None line numbers during import. This test ensures
we handle that gracefully.
"""
import os
from pathlib import Path

from ddtrace.internal.coverage.code import ModuleCodeCollector
from ddtrace.internal.coverage.installer import install

cwd_path = os.getcwd()
include_path = Path(cwd_path + "/tests/coverage/included_path/")

# Install coverage BEFORE importing the module
install(include_paths=[include_path])

# This import will execute module-level code that may have None line numbers
# If the bug exists, this will crash with TypeError
from tests.coverage.included_path.synthetic_opcodes_module import MODULE_LEVEL_COMP

# If we got here, we successfully handled None line numbers
assert MODULE_LEVEL_COMP == [0, 3, 6, 9, 12]

# Verify coverage was collected
lines = ModuleCodeCollector._instance.lines
module_tracked = any("synthetic_opcodes_module.py" in path for path in lines.keys())
assert module_tracked, f"Module not tracked during import. Found paths: {list(lines.keys())}"
# If we got here without TypeError, we successfully handled all import-time opcodes
Loading