Skip to content

Commit

Permalink
Manually resolves links relatively to root_dir, and prevent escape
Browse files Browse the repository at this point in the history
This patch is a followup of #311.

It appeared that we were not resolving paths when reading from files.
This means that a symbolic link present under `root_dir` could be
blindly followed _outside_ of `root_dir`, possibly leading to host
files.
  • Loading branch information
Samuel FORESTIER authored and HorlogeSkynet committed Apr 20, 2024
1 parent 7ce285c commit 1cdfc44
Show file tree
Hide file tree
Showing 14 changed files with 150 additions and 14 deletions.
104 changes: 90 additions & 14 deletions src/distro/distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ def __init__(
* :py:exc:`UnicodeError`: A data source has unexpected characters or
uses an unexpected encoding.
"""
self.root_dir = root_dir
self.root_dir = os.path.realpath(root_dir) if root_dir else None
self.etc_dir = os.path.join(root_dir, "etc") if root_dir else _UNIXCONFDIR
self.usr_lib_dir = (
os.path.join(root_dir, "usr/lib") if root_dir else _UNIXUSRLIBDIR
Expand All @@ -773,8 +773,10 @@ def __init__(

# NOTE: The idea is to respect order **and** have it set
# at all times for API backwards compatibility.
if os.path.isfile(etc_dir_os_release_file) or not os.path.isfile(
usr_lib_os_release_file
if (
self.root_dir is not None
or os.path.isfile(etc_dir_os_release_file)
or not os.path.isfile(usr_lib_os_release_file)
):
self.os_release_file = etc_dir_os_release_file
else:
Expand Down Expand Up @@ -1091,6 +1093,66 @@ def uname_attr(self, attribute: str) -> str:
"""
return self._uname_info.get(attribute, "")

@staticmethod
def __abs_path_join(root_path: str, abs_path: str) -> str:
rel_path = os.path.splitdrive(abs_path)[1].lstrip(os.sep)
if os.altsep is not None:
rel_path = rel_path.lstrip(os.altsep)

return os.path.join(root_path, rel_path)

def __resolve_chroot_symlink_as_needed(self, link_location: str) -> str:
"""
Resolves a potential symlink in ``link_location`` against
``self.root_dir`` if inside the chroot, else just return the original
path.
We're doing this check at a central place, to making the calling code
more readable and to de-duplicate.
"""
if self.root_dir is None:
return link_location

# consider non-absolute `link_location` relative to `root_dir` (as
# `os.path.commonpath` does not support mixing absolute and relative
# paths).
if not os.path.isabs(link_location):
link_location = self.__abs_path_join(self.root_dir, link_location)

seen_paths = set()
while True:
# while `link_location` _should_ be relative to chroot (either
# passed from trusted code or already resolved by previous loop
# iteration), we enforce this check as `self.os_release_file` and
# `self.distro_release_file` may be user-supplied.
if os.path.commonpath(
[self.root_dir, link_location]
) != self.root_dir.rstrip(os.sep):
raise FileNotFoundError

if not os.path.islink(link_location):
return link_location

resolved = os.readlink(link_location)
if not os.path.isabs(resolved):
# compute resolved path relatively to previous `link_location`
# and accordingly to chroot. We also canonize "top" `..`
# components (relatively to `self.root_dir`), as they would
# legitimately resolve to chroot itself).
resolved = os.path.relpath(
os.path.join(os.path.dirname(link_location), resolved),
start=self.root_dir,
).lstrip(os.pardir + os.pathsep)

# "move" back absolute path inside the chroot
resolved = self.__abs_path_join(self.root_dir, resolved)

# prevent symlinks infinite loop
if resolved in seen_paths:
raise FileNotFoundError

seen_paths.add(link_location)
link_location = resolved

@cached_property
def _os_release_info(self) -> Dict[str, str]:
"""
Expand All @@ -1099,10 +1161,14 @@ def _os_release_info(self) -> Dict[str, str]:
Returns:
A dictionary containing all information items.
"""
if os.path.isfile(self.os_release_file):
with open(self.os_release_file, encoding="utf-8") as release_file:
try:
with open(
self.__resolve_chroot_symlink_as_needed(self.os_release_file),
encoding="utf-8",
) as release_file:
return self._parse_os_release_content(release_file)
return {}
except OSError:
return {}

@staticmethod
def _parse_os_release_content(lines: TextIO) -> Dict[str, str]:
Expand Down Expand Up @@ -1223,7 +1289,10 @@ def _oslevel_info(self) -> str:
def _debian_version(self) -> str:
try:
with open(
os.path.join(self.etc_dir, "debian_version"), encoding="ascii"
self.__resolve_chroot_symlink_as_needed(
os.path.join(self.etc_dir, "debian_version")
),
encoding="ascii",
) as fp:
return fp.readline().rstrip()
except FileNotFoundError:
Expand All @@ -1233,7 +1302,10 @@ def _debian_version(self) -> str:
def _armbian_version(self) -> str:
try:
with open(
os.path.join(self.etc_dir, "armbian-release"), encoding="ascii"
self.__resolve_chroot_symlink_as_needed(
os.path.join(self.etc_dir, "armbian-release")
),
encoding="ascii",
) as fp:
return self._parse_os_release_content(fp).get("version", "")
except FileNotFoundError:
Expand Down Expand Up @@ -1285,9 +1357,10 @@ def _distro_release_info(self) -> Dict[str, str]:
try:
basenames = [
basename
for basename in os.listdir(self.etc_dir)
for basename in os.listdir(
self.__resolve_chroot_symlink_as_needed(self.etc_dir)
)
if basename not in _DISTRO_RELEASE_IGNORE_BASENAMES
and os.path.isfile(os.path.join(self.etc_dir, basename))
]
# We sort for repeatability in cases where there are multiple
# distro specific files; e.g. CentOS, Oracle, Enterprise all
Expand All @@ -1303,12 +1376,13 @@ def _distro_release_info(self) -> Dict[str, str]:
match = _DISTRO_RELEASE_BASENAME_PATTERN.match(basename)
if match is None:
continue
filepath = os.path.join(self.etc_dir, basename)
distro_info = self._parse_distro_release_file(filepath)
# NOTE: _parse_distro_release_file below will be resolving for us
unresolved_filepath = os.path.join(self.etc_dir, basename)
distro_info = self._parse_distro_release_file(unresolved_filepath)
# The name is always present if the pattern matches.
if "name" not in distro_info:
continue
self.distro_release_file = filepath
self.distro_release_file = unresolved_filepath
break
else: # the loop didn't "break": no candidate.
return {}
Expand Down Expand Up @@ -1342,7 +1416,9 @@ def _parse_distro_release_file(self, filepath: str) -> Dict[str, str]:
A dictionary containing all information items.
"""
try:
with open(filepath, encoding="utf-8") as fp:
with open(
self.__resolve_chroot_symlink_as_needed(filepath), encoding="utf-8"
) as fp:
# Only parse the first line. For instance, on SLES there
# are multiple lines. We don't want them...
return self._parse_distro_release_content(fp.readline())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=absolute_symlinks
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=root_dir_non_escape
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=root_dir_os_release_file_abs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=root_dir_os_release_file_rel
49 changes: 49 additions & 0 deletions tests/test_distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,55 @@ def test_empty_release(self) -> None:
desired_outcome = {"id": "empty"}
self._test_outcome(desired_outcome)

def test_root_dir_os_release_file_rel(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(
TESTDISTROS, "distro", "root_dir_os_release_file_rel"
),
os_release_file="tmp/os-release",
)
desired_outcome = {"id": "root_dir_os_release_file_rel"}
self._test_outcome(desired_outcome)

def test_root_dir_os_release_file_abs(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(
TESTDISTROS, "distro", "root_dir_os_release_file_abs"
),
os_release_file="/tmp/os-release",
)
# as we honor `os_release_file`, loading existing file outside of root_dir has
# been prevented (empty data)
self._test_outcome({})

def test_root_dir_absolute_symlinks(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_absolute_symlinks")
)
desired_outcome = {"id": "absolute_symlinks"}
self._test_outcome(desired_outcome)

def test_root_dir_escape(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_escape")
)
# loading existing file outside of root_dir has been prevented (empty data)
self._test_outcome({})

def test_root_dir_non_escape(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_non_escape")
)
desired_outcome = {"id": "root_dir_non_escape"}
self._test_outcome(desired_outcome)

def test_root_dir_symlinks_loop(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_symlinks_loop")
)
# due to symbolic links loop, loading of file has been prevented (empty data)
self._test_outcome({})

def test_dontincludeuname(self) -> None:
self._setup_for_distro(os.path.join(TESTDISTROS, "distro", "dontincludeuname"))

Expand Down

0 comments on commit 1cdfc44

Please sign in to comment.