diff --git a/fs/tarfs.py b/fs/tarfs.py index 8729027d..7577c2b3 100644 --- a/fs/tarfs.py +++ b/fs/tarfs.py @@ -318,7 +318,11 @@ def _directory_entries(self): return self._directory_cache def _follow_symlink(self, entry): - """Follow an symlink `TarInfo` to find a concrete entry.""" + """Follow an symlink `TarInfo` to find a concrete entry. + + Returns ``None`` if the symlink is dangling. + """ + done = set() _entry = entry while _entry.issym(): linkname = normpath( @@ -326,17 +330,21 @@ def _follow_symlink(self, entry): ) resolved = self._resolve(linkname) if resolved is None: - raise errors.ResourceNotFound(linkname) + return None + done.add(_entry) _entry = self._directory_entries[resolved] + # if we already saw this symlink, then we are following cyclic + # symlinks and we should break the loop + if _entry in done: + return None return _entry def _resolve(self, path): """Replace path components that are symlinks with concrete components. - Returns: - - + Returns ``None`` when the path could not be resolved to an existing + entry in the archive. """ if path in self._directory_entries or not path: return path @@ -406,7 +414,7 @@ def getinfo(self, path, namespaces=None): target = normpath(join( dirname(self._decode(member.name)), self._decode(member.linkname), - )) # type: Option[Text] + )) # type: Optional[Text] else: target = None raw_info["link"] = {"target": target} @@ -441,8 +449,8 @@ def isdir(self, path): _path = relpath(self.validatepath(path)) realpath = self._resolve(_path) if realpath is not None: - entry = self._directory_entries[realpath] - return self._follow_symlink(entry).isdir() + entry = self._follow_symlink(self._directory_entries[realpath]) + return False if entry is None else entry.isdir() else: return False @@ -450,8 +458,8 @@ def isfile(self, path): _path = relpath(self.validatepath(path)) realpath = self._resolve(_path) if realpath is not None: - entry = self._directory_entries[realpath] - return self._follow_symlink(entry).isfile() + entry = self._follow_symlink(self._directory_entries[realpath]) + return False if entry is None else entry.isfile() else: return False @@ -480,12 +488,12 @@ def listdir(self, path): elif realpath: target = self._follow_symlink(self._directory_entries[realpath]) # check the path is either a symlink mapping to a directory or a directory - if target.isdir(): - base = target.name - elif target.issym(): - base = target.linkname - else: + if target is None: + raise errors.ResourceNotFound(path) + elif not target.isdir(): raise errors.DirectoryExpected(path) + else: + base = target.name else: base = "" @@ -515,11 +523,16 @@ def openbin(self, path, mode="r", buffering=-1, **options): if "w" in mode or "+" in mode or "a" in mode: raise errors.ResourceReadOnly(path) - # check the path actually resolves after following symlinks + # check the path actually resolves after following symlink components _realpath = self._resolve(_path) if _realpath is None: raise errors.ResourceNotFound(path) + # get the entry at the resolved path and follow all symlinks + entry = self._follow_symlink(self._directory_entries[_realpath]) + if entry is None: + raise errors.ResourceNotFound(path) + # TarFile.extractfile returns None if the entry is not a file # neither a file nor a symlink reader = self._tar.extractfile(self._directory_entries[_realpath]) diff --git a/tests/test_tarfs.py b/tests/test_tarfs.py index 3e6bab72..2ebced5a 100644 --- a/tests/test_tarfs.py +++ b/tests/test_tarfs.py @@ -14,7 +14,7 @@ from fs.compress import write_tar from fs.opener import open_fs from fs.opener.errors import NotWriteable -from fs.errors import NoURL +from fs.errors import NoURL, ResourceNotFound from fs.test import FSTestCases from .test_archives import ArchiveTestCases @@ -415,6 +415,54 @@ def test_listdir(self): self.assertEqual(sorted(self.fs.listdir("spam")), ["bar.txt", "baz.txt"]) +class TestBrokenSymlinks(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tmpfs = open_fs("temp://") + + @classmethod + def tearDownClass(cls): + cls.tmpfs.close() + + def setUp(self): + def _info(name, **kwargs): + info = tarfile.TarInfo(name) + for k, v in kwargs.items(): + setattr(info, k, v) + return info + + # /foo + # /foo/baz.txt -> /foo/bar.txt + # /spam -> /eggs + # /eggs -> /spam + + self.tempfile = self.tmpfs.open("test.tar", "wb+") + with tarfile.open(mode="w", fileobj=self.tempfile) as tf: + tf.addfile(_info("foo", type=tarfile.DIRTYPE)) + tf.addfile(_info("foo/baz.txt", type=tarfile.SYMTYPE, linkname="bar.txt")) + tf.addfile(_info("spam", type=tarfile.SYMTYPE, linkname="eggs")) + tf.addfile(_info("eggs", type=tarfile.SYMTYPE, linkname="spam")) + self.tempfile.seek(0) + self.fs = tarfs.TarFS(self.tempfile) + + def tearDown(self): + self.fs.close() + self.tempfile.close() + + def test_dangling(self): + self.assertFalse(self.fs.isfile("foo/baz.txt")) + self.assertFalse(self.fs.isdir("foo/baz.txt")) + self.assertRaises(ResourceNotFound, self.fs.openbin, "foo/baz.txt") + self.assertRaises(ResourceNotFound, self.fs.listdir, "foo/baz.txt") + + def test_cyclic(self): + self.assertFalse(self.fs.isfile("spam")) + self.assertFalse(self.fs.isdir("spam")) + self.assertRaises(ResourceNotFound, self.fs.openbin, "spam") + self.assertRaises(ResourceNotFound, self.fs.listdir, "spam") + + class TestReadTarFSMem(TestReadTarFS): def make_source_fs(self): return open_fs("mem://")