Skip to content

Commit

Permalink
Merge pull request from GHSA-83pv-qr33-2vcf
Browse files Browse the repository at this point in the history
Fix for vulnerability introduced in #739, caused by failure to normalize the path part extracted from URL before serving data from a static directory.

See GHSA-83pv-qr33-2vcf for specific details.

Co-authored-by: Jacob Coffee <[email protected]>
Co-authored-by: Brian Edgar Ré <[email protected]>
  • Loading branch information
3 people authored May 6, 2024
1 parent 6c35d84 commit 979453a
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 8 deletions.
20 changes: 15 additions & 5 deletions litestar/static_files/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# ruff: noqa: PTH118
from __future__ import annotations

from os.path import commonpath
import os.path
from pathlib import Path
from typing import TYPE_CHECKING, Literal, Sequence

Expand All @@ -12,7 +13,6 @@

__all__ = ("StaticFiles",)


if TYPE_CHECKING:
from litestar.types import Receive, Scope, Send
from litestar.types.composite_types import PathType
Expand Down Expand Up @@ -45,7 +45,9 @@ def __init__(
headers: Headers that will be sent with every response.
"""
self.adapter = FileSystemAdapter(file_system)
self.directories = tuple(Path(p).resolve() if resolve_symlinks else Path(p) for p in directories)
self.directories = tuple(
os.path.normpath(Path(p).resolve() if resolve_symlinks else Path(p)) for p in directories
)
self.is_html_mode = is_html_mode
self.send_as_attachment = send_as_attachment
self.headers = headers
Expand All @@ -55,6 +57,12 @@ async def get_fs_info(
) -> tuple[Path, FileInfo] | tuple[None, None]:
"""Return the resolved path and a :class:`stat_result <os.stat_result>`.
.. versionchanged:: 2.7.2
Prevent `CVE-2024-32982 <https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-32982>`_
by ensuring that the resolved path is within the configured directory as part of `advisory
GHSA-83pv-qr33-2vcf <https://github.com/advisories/GHSA-83pv-qr33-2vcf>`_.
Args:
directories: A list of directory paths.
file_path: A file path to resolve
Expand All @@ -66,8 +74,10 @@ async def get_fs_info(
for directory in directories:
try:
joined_path = Path(directory, file_path)
file_info = await self.adapter.info(joined_path)
if file_info and commonpath([str(directory), file_info["name"], joined_path]) == str(directory):
normalized_file_path = os.path.normpath(joined_path)
if os.path.commonpath([directory, normalized_file_path]) == str(directory) and (
file_info := await self.adapter.info(joined_path)
):
return joined_path, file_info
except FileNotFoundError:
continue
Expand Down
48 changes: 48 additions & 0 deletions tests/e2e/test_routing/test_path_resolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,51 @@ async def pathfinder(path: Optional[Path]) -> str:

assert httpx.get("http://127.0.0.1:9999/").text == "None"
assert httpx.get("http://127.0.0.1:9999/something").text == "/something"


@pytest.mark.parametrize(
"server_command",
[
pytest.param(["uvicorn", "app:app", "--port", "9999", "--root-path", "/test"], id="uvicorn"),
pytest.param(["hypercorn", "app:app", "--bind", "127.0.0.1:9999", "--root-path", "/test"], id="hypercorn"),
pytest.param(["daphne", "app:app", "--port", "9999", "--root-path", "/test"], id="daphne"),
],
)
@pytest.mark.xdist_group("live_server_test")
@pytest.mark.server_integration
def test_no_path_traversal_from_static_directory(
tmp_path: Path, monkeypatch: MonkeyPatch, server_command: List[str], run_server: Callable[[str, List[str]], None]
) -> None:
import http.client

static = tmp_path / "static"
static.mkdir()
(static / "index.html").write_text("Hello, World!")

app = """
from pathlib import Path
from litestar import Litestar
from litestar.static_files import create_static_files_router
import uvicorn
app = Litestar(
route_handlers=[
create_static_files_router(path="/static", directories=["static"]),
],
)
"""

def send_request(host: str, port: int, path: str) -> http.client.HTTPResponse:
connection = http.client.HTTPConnection(host, port)
connection.request("GET", path)
resp = connection.getresponse()
connection.close()
return resp

run_server(app, server_command)

response = send_request("127.0.0.1", 9999, "/static/index.html")
assert response.status == 200

response = send_request("127.0.0.1", 9999, "/static/../app.py")
assert response.status == 404
30 changes: 29 additions & 1 deletion tests/unit/test_static_files/test_file_serving_resolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing_extensions import TypeAlias

from litestar import MediaType, Router, get
from litestar.static_files import StaticFilesConfig, create_static_files_router
from litestar.static_files import StaticFiles, StaticFilesConfig, create_static_files_router
from litestar.status_codes import HTTP_200_OK
from litestar.testing import create_test_client
from tests.unit.test_static_files.conftest import MakeConfig
Expand Down Expand Up @@ -295,3 +295,31 @@ def test_resolve_symlinks(tmp_path: Path, resolve: bool) -> None:
assert client.get("/test.txt").status_code == 404
else:
assert client.get("/test.txt").status_code == 200


async def test_staticfiles_get_fs_info_no_access_to_non_static_directory(
tmp_path: Path,
file_system: FileSystemProtocol,
) -> None:
assets = tmp_path / "assets"
assets.mkdir()
index = tmp_path / "index.html"
index.write_text("content", "utf-8")
static_files = StaticFiles(is_html_mode=False, directories=[assets], file_system=file_system)
path, info = await static_files.get_fs_info([assets], "../index.html")
assert path is None
assert info is None


async def test_staticfiles_get_fs_info_no_access_to_non_static_file_with_prefix(
tmp_path: Path,
file_system: FileSystemProtocol,
) -> None:
static = tmp_path / "static"
static.mkdir()
private_file = tmp_path / "staticsecrets.env"
private_file.write_text("content", "utf-8")
static_files = StaticFiles(is_html_mode=False, directories=[static], file_system=file_system)
path, info = await static_files.get_fs_info([static], "../staticsecrets.env")
assert path is None
assert info is None
40 changes: 38 additions & 2 deletions tests/unit/test_static_files/test_static_files_validation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import TYPE_CHECKING, Any, List
import asyncio
from pathlib import Path, PosixPath
from typing import TYPE_CHECKING, Any, List, cast

import pytest

Expand All @@ -9,7 +11,7 @@
from litestar.testing import create_test_client

if TYPE_CHECKING:
from pathlib import Path
from litestar.static_files import StaticFiles


@pytest.mark.parametrize("directories", [[], [""]])
Expand Down Expand Up @@ -113,3 +115,37 @@ def test_runtime_validation_of_request_method_create_handler(tmpdir: "Path", met
with create_test_client(create_static_files_router(path="/static", directories=[tmpdir])) as client:
response = client.request(method, "/static/test.txt")
assert response.status_code == expected


def test_config_validation_of_path_prevents_directory_traversal(tmpdir: "Path") -> None:
# Setup: Create a 'secret.txt' outside the static directory to simulate sensitive file
secret_path = Path(tmpdir) / "../secret.txt"
secret_path.write_text("This is a secret file.", encoding="utf-8")

# Setup: Create 'test.txt' inside the static directory
test_file_path = Path(tmpdir) / "test.txt"
test_file_path.write_text("This is a test file.", encoding="utf-8")

# Get StaticFiles handler
config = StaticFilesConfig(path="/static", directories=[tmpdir])
asgi_router = config.to_static_files_app()
static_files_handler = cast("StaticFiles", asgi_router.fn)

# Resolve file path with the StaticFiles handler
string_path = Path("../secret.txt").as_posix()

coroutine = static_files_handler.get_fs_info(directories=static_files_handler.directories, file_path=string_path)
resolved_path, fs_info = asyncio.run(coroutine)

assert resolved_path is None # Because the resolved path is outside the static directory
assert fs_info is None # Because the file doesn't exist, so there is no info

# Resolve file path with the StaticFiles handler
string_path = Path("test.txt").as_posix()

coroutine = static_files_handler.get_fs_info(directories=static_files_handler.directories, file_path=string_path)
resolved_path, fs_info = asyncio.run(coroutine)

expected_resolved_path = PosixPath(str(tmpdir / "test.txt"))
assert resolved_path == expected_resolved_path # Because the resolved path is inside the static directory
assert fs_info is not None # Because the file exists, so there is info

0 comments on commit 979453a

Please sign in to comment.