Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type hints, fix mypy issues (#198) #228

Merged
merged 15 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions CHANGES → CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
=======
CHANGES
=======

4.x - Unreleased
================
Unreleased
===========
- Add type hints. ([#228](https://github.com/mozilla/django-csp/pull/228))

4.0b1
=====
BACKWARDS INCOMPATIBLE changes:
- Move to dict-based configuration which allows for setting policies for both enforced and
report-only. See the migration guide in the docs for migrating your settings.
Expand Down
6 changes: 4 additions & 2 deletions csp/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@
class CspConfig(AppConfig):
name = "csp"

def ready(self):
checks.register(check_django_csp_lt_4_0, checks.Tags.security)
def ready(self) -> None:
# Ignore known issue typeddjango/django-stubs #2232
# The overload of CheckRegistry.register as a function is incomplete
checks.register(check_django_csp_lt_4_0, checks.Tags.security) # type: ignore
25 changes: 15 additions & 10 deletions csp/checks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from __future__ import annotations
import pprint
from typing import Dict, Tuple, Any, Optional, Sequence, TYPE_CHECKING, List

from django.conf import settings
from django.core.checks import Error

from csp.constants import NONCE

if TYPE_CHECKING:
from django.apps.config import AppConfig


OUTDATED_SETTINGS = [
"CSP_CHILD_SRC",
Expand Down Expand Up @@ -40,21 +45,21 @@
]


def migrate_settings():
def migrate_settings() -> Tuple[Dict[str, Any], bool]:
# This function is used to migrate settings from the old format to the new format.
config = {
config: Dict[str, Any] = {
"DIRECTIVES": {},
}
REPORT_ONLY = False

if hasattr(settings, "CSP_REPORT_ONLY"):
REPORT_ONLY = settings.CSP_REPORT_ONLY
REPORT_ONLY = getattr(settings, "CSP_REPORT_ONLY", False)

if hasattr(settings, "CSP_EXCLUDE_URL_PREFIXES"):
config["EXCLUDE_URL_PREFIXES"] = settings.CSP_EXCLUDE_URL_PREFIXES
_EXCLUDE_URL_PREFIXES = getattr(settings, "CSP_EXCLUDE_URL_PREFIXES", None)
if _EXCLUDE_URL_PREFIXES is not None:
config["EXCLUDE_URL_PREFIXES"] = _EXCLUDE_URL_PREFIXES

if hasattr(settings, "CSP_REPORT_PERCENTAGE"):
config["REPORT_PERCENTAGE"] = round(settings.CSP_REPORT_PERCENTAGE * 100)
_REPORT_PERCENTAGE = getattr(settings, "CSP_REPORT_PERCENTAGE", None)
if _REPORT_PERCENTAGE is not None:
config["REPORT_PERCENTAGE"] = round(_REPORT_PERCENTAGE * 100)

include_nonce_in = getattr(settings, "CSP_INCLUDE_NONCE_IN", [])

Expand All @@ -70,7 +75,7 @@ def migrate_settings():
return config, REPORT_ONLY


def check_django_csp_lt_4_0(app_configs, **kwargs):
def check_django_csp_lt_4_0(app_configs: Optional[Sequence[AppConfig]], **kwargs: Any) -> List[Error]:
check_settings = OUTDATED_SETTINGS + ["CSP_REPORT_ONLY", "CSP_EXCLUDE_URL_PREFIXES", "CSP_REPORT_PERCENTAGE"]
if any(hasattr(settings, setting) for setting in check_settings):
# Try to build the new config.
Expand Down
6 changes: 4 additions & 2 deletions csp/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, Type

HEADER = "Content-Security-Policy"
HEADER_REPORT_ONLY = "Content-Security-Policy-Report-Only"

Expand All @@ -15,12 +17,12 @@
class Nonce:
_instance = None

def __new__(cls, *args, **kwargs):
def __new__(cls: Type["Nonce"], *args: Any, **kwargs: Any) -> "Nonce":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way to write this is:

def __new__(cls, *args: Any, **kwargs: Any) -> Self:

(Self is importable from typing_extensions for the pythons supported here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this seems to work. Do you want to submit the PR?

if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance

def __repr__(self):
def __repr__(self) -> str:
return "csp.constants.NONCE"


Expand Down
9 changes: 8 additions & 1 deletion csp/context_processors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
def nonce(request):
from __future__ import annotations
from typing import Dict, Literal, TYPE_CHECKING

if TYPE_CHECKING:
from django.http import HttpRequest


def nonce(request: HttpRequest) -> Dict[Literal["CSP_NONCE"], str]:
nonce = request.csp_nonce if hasattr(request, "csp_nonce") else ""

return {"CSP_NONCE": nonce}
9 changes: 7 additions & 2 deletions csp/contrib/rate_limiting.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import random

from django.conf import settings

from csp.middleware import CSPMiddleware
from csp.utils import build_policy

if TYPE_CHECKING:
from django.http import HttpRequest, HttpResponseBase


class RateLimitedCSPMiddleware(CSPMiddleware):
"""A CSP middleware that rate-limits the number of violation reports sent
to report-uri by excluding it from some requests."""

def build_policy(self, request, response):
def build_policy(self, request: HttpRequest, response: HttpResponseBase) -> str:
config = getattr(response, "_csp_config", None)
update = getattr(response, "_csp_update", None)
replace = getattr(response, "_csp_replace", {})
Expand All @@ -28,7 +33,7 @@ def build_policy(self, request, response):

return build_policy(config=config, update=update, replace=replace, nonce=nonce)

def build_policy_ro(self, request, response):
def build_policy_ro(self, request: HttpRequest, response: HttpResponseBase) -> str:
config = getattr(response, "_csp_config_ro", None)
update = getattr(response, "_csp_update_ro", None)
replace = getattr(response, "_csp_replace_ro", {})
Expand Down
55 changes: 34 additions & 21 deletions csp/decorators.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
from __future__ import annotations

from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional

if TYPE_CHECKING:
from django.http import HttpRequest, HttpResponseBase

# A generic Django view function
_VIEW_T = Callable[[HttpRequest], HttpResponseBase]
_VIEW_DECORATOR_T = Callable[[_VIEW_T], _VIEW_T]


def csp_exempt(REPORT_ONLY=None):
def csp_exempt(REPORT_ONLY: Optional[bool] = None) -> _VIEW_DECORATOR_T:
if callable(REPORT_ONLY):
raise RuntimeError(
"Incompatible `csp_exempt` decorator usage. This decorator now requires arguments, "
Expand All @@ -10,14 +20,14 @@ def csp_exempt(REPORT_ONLY=None):
"information."
)

def decorator(f):
def decorator(f: _VIEW_T) -> _VIEW_T:
@wraps(f)
def _wrapped(*a, **kw):
def _wrapped(*a: Any, **kw: Any) -> HttpResponseBase:
resp = f(*a, **kw)
if REPORT_ONLY:
resp._csp_exempt_ro = True
setattr(resp, "_csp_exempt_ro", True)
else:
resp._csp_exempt = True
setattr(resp, "_csp_exempt", True)
return resp

return _wrapped
Expand All @@ -32,58 +42,61 @@ def _wrapped(*a, **kw):
)


def csp_update(config=None, REPORT_ONLY=False, **kwargs):
def csp_update(config: Optional[Dict[str, Any]] = None, REPORT_ONLY: bool = False, **kwargs: Any) -> _VIEW_DECORATOR_T:
if config is None and kwargs:
raise RuntimeError(DECORATOR_DEPRECATION_ERROR.format(fname="csp_update"))

def decorator(f):
def decorator(f: _VIEW_T) -> _VIEW_T:
@wraps(f)
def _wrapped(*a, **kw):
def _wrapped(*a: Any, **kw: Any) -> HttpResponseBase:
resp = f(*a, **kw)
if REPORT_ONLY:
resp._csp_update_ro = config
setattr(resp, "_csp_update_ro", config)
else:
resp._csp_update = config
setattr(resp, "_csp_update", config)
return resp

return _wrapped

return decorator


def csp_replace(config=None, REPORT_ONLY=False, **kwargs):
def csp_replace(config: Optional[Dict[str, Any]] = None, REPORT_ONLY: bool = False, **kwargs: Any) -> _VIEW_DECORATOR_T:
if config is None and kwargs:
raise RuntimeError(DECORATOR_DEPRECATION_ERROR.format(fname="csp_replace"))

def decorator(f):
def decorator(f: _VIEW_T) -> _VIEW_T:
@wraps(f)
def _wrapped(*a, **kw):
def _wrapped(*a: Any, **kw: Any) -> HttpResponseBase:
resp = f(*a, **kw)
if REPORT_ONLY:
resp._csp_replace_ro = config
setattr(resp, "_csp_replace_ro", config)
else:
resp._csp_replace = config
setattr(resp, "_csp_replace", config)
return resp

return _wrapped

return decorator


def csp(config=None, REPORT_ONLY=False, **kwargs):
def csp(config: Optional[Dict[str, Any]] = None, REPORT_ONLY: bool = False, **kwargs: Any) -> _VIEW_DECORATOR_T:
if config is None and kwargs:
raise RuntimeError(DECORATOR_DEPRECATION_ERROR.format(fname="csp"))

config = {k: [v] if isinstance(v, str) else v for k, v in config.items()}
if config is None:
processed_config: Dict[str, List[Any]] = {}
else:
processed_config = {k: [v] if isinstance(v, str) else v for k, v in config.items()}

def decorator(f):
def decorator(f: _VIEW_T) -> _VIEW_T:
@wraps(f)
def _wrapped(*a, **kw):
def _wrapped(*a: Any, **kw: Any) -> HttpResponseBase:
resp = f(*a, **kw)
if REPORT_ONLY:
resp._csp_config_ro = config
setattr(resp, "_csp_config_ro", processed_config)
else:
resp._csp_config = config
setattr(resp, "_csp_config", processed_config)
return resp

return _wrapped
Expand Down
12 changes: 9 additions & 3 deletions csp/extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from __future__ import annotations
from typing import Callable, TYPE_CHECKING, Any

from jinja2 import nodes
from jinja2.ext import Extension

from csp.utils import SCRIPT_ATTRS, build_script_tag

if TYPE_CHECKING:
from jinja2.parser import Parser


class NoncedScript(Extension):
# a set of names that trigger the extension.
tags = {"script"}

def parse(self, parser):
def parse(self, parser: Parser) -> nodes.Node:
# the first token is the token that started the tag. In our case
# we only listen to ``'script'`` so this will be a name token with
# `script` as value. We get the line number so that we can give
Expand All @@ -26,13 +32,13 @@ def parse(self, parser):

# now we parse the body of the script block up to `endscript` and
# drop the needle (which would always be `endscript` in that case)
body = parser.parse_statements(["name:endscript"], drop_needle=True)
body = parser.parse_statements(("name:endscript",), drop_needle=True)

# now return a `CallBlock` node that calls our _render_script
# helper method on this extension.
return nodes.CallBlock(self.call_method("_render_script", kwargs=kwargs), [], [], body).set_lineno(lineno)

def _render_script(self, caller, **kwargs):
def _render_script(self, caller: Callable[[], str], **kwargs: Any) -> str:
ctx = kwargs.pop("ctx")
request = ctx.get("request")
kwargs["nonce"] = request.csp_nonce
Expand Down
36 changes: 23 additions & 13 deletions csp/middleware.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations
import base64
import http.client as http_client
import os
from functools import partial
from typing import TYPE_CHECKING

from django.conf import settings
from django.utils.deprecation import MiddlewareMixin
Expand All @@ -10,6 +12,9 @@
from csp.constants import HEADER, HEADER_REPORT_ONLY
from csp.utils import build_policy

if TYPE_CHECKING:
from django.http import HttpRequest, HttpResponseBase


class CSPMiddleware(MiddlewareMixin):
"""
Expand All @@ -21,17 +26,20 @@ class CSPMiddleware(MiddlewareMixin):

"""

def _make_nonce(self, request):
def _make_nonce(self, request: HttpRequest) -> str:
# Ensure that any subsequent calls to request.csp_nonce return the same value
if not getattr(request, "_csp_nonce", None):
request._csp_nonce = base64.b64encode(os.urandom(16)).decode("ascii")
return request._csp_nonce
stored_nonce = getattr(request, "_csp_nonce", None)
if isinstance(stored_nonce, str):
return stored_nonce
nonce = base64.b64encode(os.urandom(16)).decode("ascii")
setattr(request, "_csp_nonce", nonce)
return nonce

def process_request(self, request):
def process_request(self, request: HttpRequest) -> None:
nonce = partial(self._make_nonce, request)
request.csp_nonce = SimpleLazyObject(nonce)
setattr(request, "csp_nonce", SimpleLazyObject(nonce))

def process_response(self, request, response):
def process_response(self, request: HttpRequest, response: HttpResponseBase) -> HttpResponseBase:
# Check for debug view
exempted_debug_codes = (
http_client.INTERNAL_SERVER_ERROR,
Expand All @@ -45,8 +53,9 @@ def process_response(self, request, response):
# Only set header if not already set and not an excluded prefix and not exempted.
is_not_exempt = getattr(response, "_csp_exempt", False) is False
no_header = HEADER not in response
prefixes = getattr(settings, "CONTENT_SECURITY_POLICY", {}).get("EXCLUDE_URL_PREFIXES", ())
is_not_excluded = not request.path_info.startswith(prefixes)
policy = getattr(settings, "CONTENT_SECURITY_POLICY", None) or {}
prefixes = policy.get("EXCLUDE_URL_PREFIXES", None) or ()
is_not_excluded = not request.path_info.startswith(tuple(prefixes))
if all((no_header, is_not_exempt, is_not_excluded)):
response[HEADER] = csp

Expand All @@ -55,21 +64,22 @@ def process_response(self, request, response):
# Only set header if not already set and not an excluded prefix and not exempted.
is_not_exempt = getattr(response, "_csp_exempt_ro", False) is False
no_header = HEADER_REPORT_ONLY not in response
prefixes = getattr(settings, "CONTENT_SECURITY_POLICY_REPORT_ONLY", {}).get("EXCLUDE_URL_PREFIXES", ())
is_not_excluded = not request.path_info.startswith(prefixes)
policy = getattr(settings, "CONTENT_SECURITY_POLICY_REPORT_ONLY", None) or {}
prefixes = policy.get("EXCLUDE_URL_PREFIXES", None) or ()
is_not_excluded = not request.path_info.startswith(tuple(prefixes))
if all((no_header, is_not_exempt, is_not_excluded)):
response[HEADER_REPORT_ONLY] = csp_ro

return response

def build_policy(self, request, response):
def build_policy(self, request: HttpRequest, response: HttpResponseBase) -> str:
config = getattr(response, "_csp_config", None)
update = getattr(response, "_csp_update", None)
replace = getattr(response, "_csp_replace", None)
nonce = getattr(request, "_csp_nonce", None)
return build_policy(config=config, update=update, replace=replace, nonce=nonce)

def build_policy_ro(self, request, response):
def build_policy_ro(self, request: HttpRequest, response: HttpResponseBase) -> str:
config = getattr(response, "_csp_config_ro", None)
update = getattr(response, "_csp_update_ro", None)
replace = getattr(response, "_csp_replace_ro", None)
Expand Down
Empty file added csp/py.typed
Empty file.
Loading