Skip to content

Commit

Permalink
Update ReadTarFS to work on cyclic and dangling symlinks
Browse files Browse the repository at this point in the history
  • Loading branch information
althonos committed Sep 19, 2020
1 parent a49deed commit 8a07ecd
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 17 deletions.
45 changes: 29 additions & 16 deletions fs/tarfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,25 +318,33 @@ def _directory_entries(self):
return self._directory_cache

def _follow_symlink(self, entry):
"""Follow an symlink `TarInfo` to find a concrete entry."""
"""Follow an symlink `TarInfo` to find a concrete entry.
Returns ``None`` if the symlink is dangling.
"""
done = set()
_entry = entry
while _entry.issym():
linkname = normpath(
join(dirname(self._decode(_entry.name)), self._decode(_entry.linkname))
)
resolved = self._resolve(linkname)
if resolved is None:
raise errors.ResourceNotFound(linkname)
return None
done.add(_entry)
_entry = self._directory_entries[resolved]
# if we already saw this symlink, then we are following cyclic
# symlinks and we should break the loop
if _entry in done:
return None

return _entry

def _resolve(self, path):
"""Replace path components that are symlinks with concrete components.
Returns:
Returns ``None`` when the path could not be resolved to an existing
entry in the archive.
"""
if path in self._directory_entries or not path:
return path
Expand Down Expand Up @@ -406,7 +414,7 @@ def getinfo(self, path, namespaces=None):
target = normpath(join(
dirname(self._decode(member.name)),
self._decode(member.linkname),
)) # type: Option[Text]
)) # type: Optional[Text]
else:
target = None
raw_info["link"] = {"target": target}
Expand Down Expand Up @@ -441,17 +449,17 @@ def isdir(self, path):
_path = relpath(self.validatepath(path))
realpath = self._resolve(_path)
if realpath is not None:
entry = self._directory_entries[realpath]
return self._follow_symlink(entry).isdir()
entry = self._follow_symlink(self._directory_entries[realpath])
return False if entry is None else entry.isdir()
else:
return False

def isfile(self, path):
_path = relpath(self.validatepath(path))
realpath = self._resolve(_path)
if realpath is not None:
entry = self._directory_entries[realpath]
return self._follow_symlink(entry).isfile()
entry = self._follow_symlink(self._directory_entries[realpath])
return False if entry is None else entry.isfile()
else:
return False

Expand Down Expand Up @@ -480,12 +488,12 @@ def listdir(self, path):
elif realpath:
target = self._follow_symlink(self._directory_entries[realpath])
# check the path is either a symlink mapping to a directory or a directory
if target.isdir():
base = target.name
elif target.issym():
base = target.linkname
else:
if target is None:
raise errors.ResourceNotFound(path)
elif not target.isdir():
raise errors.DirectoryExpected(path)
else:
base = target.name
else:
base = ""

Expand Down Expand Up @@ -515,11 +523,16 @@ def openbin(self, path, mode="r", buffering=-1, **options):
if "w" in mode or "+" in mode or "a" in mode:
raise errors.ResourceReadOnly(path)

# check the path actually resolves after following symlinks
# check the path actually resolves after following symlink components
_realpath = self._resolve(_path)
if _realpath is None:
raise errors.ResourceNotFound(path)

# get the entry at the resolved path and follow all symlinks
entry = self._follow_symlink(self._directory_entries[_realpath])
if entry is None:
raise errors.ResourceNotFound(path)

# TarFile.extractfile returns None if the entry is not a file
# neither a file nor a symlink
reader = self._tar.extractfile(self._directory_entries[_realpath])
Expand Down
50 changes: 49 additions & 1 deletion tests/test_tarfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from fs.compress import write_tar
from fs.opener import open_fs
from fs.opener.errors import NotWriteable
from fs.errors import NoURL
from fs.errors import NoURL, ResourceNotFound
from fs.test import FSTestCases

from .test_archives import ArchiveTestCases
Expand Down Expand Up @@ -415,6 +415,54 @@ def test_listdir(self):
self.assertEqual(sorted(self.fs.listdir("spam")), ["bar.txt", "baz.txt"])


class TestBrokenSymlinks(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.tmpfs = open_fs("temp://")

@classmethod
def tearDownClass(cls):
cls.tmpfs.close()

def setUp(self):
def _info(name, **kwargs):
info = tarfile.TarInfo(name)
for k, v in kwargs.items():
setattr(info, k, v)
return info

# /foo
# /foo/baz.txt -> /foo/bar.txt
# /spam -> /eggs
# /eggs -> /spam

self.tempfile = self.tmpfs.open("test.tar", "wb+")
with tarfile.open(mode="w", fileobj=self.tempfile) as tf:
tf.addfile(_info("foo", type=tarfile.DIRTYPE))
tf.addfile(_info("foo/baz.txt", type=tarfile.SYMTYPE, linkname="bar.txt"))
tf.addfile(_info("spam", type=tarfile.SYMTYPE, linkname="eggs"))
tf.addfile(_info("eggs", type=tarfile.SYMTYPE, linkname="spam"))
self.tempfile.seek(0)
self.fs = tarfs.TarFS(self.tempfile)

def tearDown(self):
self.fs.close()
self.tempfile.close()

def test_dangling(self):
self.assertFalse(self.fs.isfile("foo/baz.txt"))
self.assertFalse(self.fs.isdir("foo/baz.txt"))
self.assertRaises(ResourceNotFound, self.fs.openbin, "foo/baz.txt")
self.assertRaises(ResourceNotFound, self.fs.listdir, "foo/baz.txt")

def test_cyclic(self):
self.assertFalse(self.fs.isfile("spam"))
self.assertFalse(self.fs.isdir("spam"))
self.assertRaises(ResourceNotFound, self.fs.openbin, "spam")
self.assertRaises(ResourceNotFound, self.fs.listdir, "spam")


class TestReadTarFSMem(TestReadTarFS):
def make_source_fs(self):
return open_fs("mem://")
Expand Down

0 comments on commit 8a07ecd

Please sign in to comment.