Skip to content

Commit

Permalink
Handle file permissions on deletion
Browse files Browse the repository at this point in the history
- raise on file deletion if missing write permission
- raise on deleting an open file under Windows
- make sure check is done recursively if deleting directories
- fixes pytest-dev#27
  • Loading branch information
mrbean-bremen committed Jun 27, 2016
1 parent 5ede6d5 commit 574ff95
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 10 deletions.
15 changes: 15 additions & 0 deletions fake_filesystem_shutil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ def testRmtree(self):
self.assertFalse(self.filesystem.Exists('%s/subdir' % directory))
self.assertFalse(self.filesystem.Exists('%s/subfile' % directory))

def testRmtreeWithoutPermissionForAFile(self):
self.filesystem.CreateFile('/foo/bar')
self.filesystem.CreateFile('/foo/baz', st_mode=stat.S_IFREG | 0o444)
self.assertRaises(OSError, self.shutil.rmtree, '/foo')
self.assertTrue(self.filesystem.Exists('/foo/baz'))

@unittest.skipIf(sys.platform == 'win32', 'Open files cannot be removed under Windows')
def testRmtreeWithOpenFileFailsUnderWindows(self):
fake_open = fake_filesystem.FakeFileOpen(self.filesystem)
self.filesystem.CreateFile('/foo/bar')
self.filesystem.CreateFile('/foo/baz')
fake_open('/foo/baz', 'r')
self.assertRaises(OSError, self.shutil.rmtree, '/foo')
self.assertTrue(self.filesystem.Exists('/foo/baz'))

def testRmtreeNonExistingDir(self):
directory = 'nonexisting'
self.assertRaises(IOError, self.shutil.rmtree, directory)
Expand Down
14 changes: 14 additions & 0 deletions fake_filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,20 @@ def testRemoveFileNoDirectory(self):
self.os.remove(file_name)
self.assertFalse(self.filesystem.Exists(file_path))

def testRemoveFileWithoutPermissionRaises(self):
path = self.os.path.join('/foo/bar')
self.filesystem.CreateFile(path)
self.os.chmod(path, 0o444)
self.assertRaises(OSError, self.os.remove, path)

@unittest.skipIf(not TestCase.is_windows, 'Open files cannot be removed under Windows')
def testRemoveOpenFileFailsUnderWindows(self):
fake_open = fake_filesystem.FakeFileOpen(self.filesystem)
path = self.os.path.join('/foo/bar')
self.filesystem.CreateFile(path)
fake_open(path, 'r')
self.assertRaises(OSError, self.os.remove, path)

def testRemoveFileRelativePath(self):
original_dir = self.os.getcwd()
directory = 'zzy'
Expand Down
2 changes: 1 addition & 1 deletion fake_tempfile_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def testNamedTemporaryFile(self):
self.assertEqual(created_filenames[0], obj.name)
self.assertTrue(self.filesystem.GetObject(obj.name))
obj.close()
self.assertRaises(IOError, self.filesystem.GetObject, obj.name)
self.assertRaises(IOError, self.filesystem.GetObject, created_filenames[0])

def testNamedTemporaryFileNoDelete(self):
obj = self.tempfile.NamedTemporaryFile(delete=False)
Expand Down
38 changes: 29 additions & 9 deletions pyfakefs/fake_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,17 +422,26 @@ def GetEntry(self, pathname_name):
"""
return self.contents[pathname_name]

def RemoveEntry(self, pathname_name):
def RemoveEntry(self, pathname_name, recursive=True):
"""Removes the specified child file or directory.
Args:
pathname_name: basename of the child object to remove
recursive: if True (default), the entries in contained directories are deleted first
Needed to propagate removal errors (e.g. permission problems) from contained entries.
Raises:
KeyError: if no child exists by the specified name
"""
entry = self.contents[pathname_name]
if self.filesystem and entry.st_nlink == 1:
if entry.st_mode & PERM_WRITE == 0:
raise OSError(errno.EACCES, 'Trying to remove object without write permission', pathname_name)
if _is_windows and self.filesystem and self.filesystem.HasOpenFile(entry):
raise OSError(errno.EACCES, 'Trying to remove an open file', pathname_name)
if recursive and isinstance(entry, FakeDirectory):
while entry.contents:
entry.RemoveEntry(list(entry.contents)[0])
elif self.filesystem and entry.st_nlink == 1:
self.filesystem.ChangeDiskUsage(-entry.GetSize(), pathname_name, entry.st_dev)

entry.st_nlink -= 1
Expand Down Expand Up @@ -640,16 +649,16 @@ def AddOpenFile(self, file_obj):
self.open_files.append(file_obj)
return len(self.open_files) - 1

def CloseOpenFile(self, file_obj):
"""Removes file_obj from the list of open files on the filesystem.
def CloseOpenFile(self, file_des):
"""Removes file object with given descriptor from the list of open files.
Sets the entry in open_files to None.
Args:
file_obj: file object to be removed to open files list.
file_des: descriptor of file object to be removed from open files list.
"""
self.open_files[file_obj.filedes] = None
heapq.heappush(self.free_fd_heap, file_obj.filedes)
self.open_files[file_des] = None
heapq.heappush(self.free_fd_heap, file_des)

def GetOpenFile(self, file_des):
"""Returns an open file.
Expand All @@ -671,6 +680,17 @@ def GetOpenFile(self, file_des):
raise OSError(errno.EBADF, 'Bad file descriptor', file_des)
return self.open_files[file_des]

def HasOpenFile(self, file_object):
"""Returns True if the given file object is in the list of open files.
Args:
file_object: The FakeFile object to be checked.
Returns:
True if the file is open.
"""
return file_object in [wrapper.GetObject() for wrapper in self.open_files if wrapper]

def NormalizePathSeparator(self, path):
"""Replace all appearances of alternative path separator with path separator.
Do nothing if no alternative separator is set.
Expand Down Expand Up @@ -1275,7 +1295,7 @@ def RenameObject(self, old_file, new_file):
old_file)

object_to_rename = old_dir_object.GetEntry(old_name)
old_dir_object.RemoveEntry(old_name)
old_dir_object.RemoveEntry(old_name, recursive=False)
object_to_rename.name = new_name
new_dir_object.AddEntry(object_to_rename)

Expand Down Expand Up @@ -2523,7 +2543,7 @@ def close(self):
if self._update:
self._file_object.SetContents(self._io.getvalue())
if self._closefd:
self._filesystem.CloseOpenFile(self)
self._filesystem.CloseOpenFile(self.filedes)
if self._delete_on_close:
self._filesystem.RemoveObject(self.name)

Expand Down
2 changes: 2 additions & 0 deletions pyfakefs/fake_tempfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ def NamedTemporaryFile(self, mode='w+b', bufsize=-1,
self._filesystem, delete_on_close=delete)
obj = mock_open(filename, mode)
obj.name = filename
# remove file wrapper created by mkstemp to avoid double open file entries
self._filesystem.CloseOpenFile(temp[0])
return obj

# pylint: disable-msg=C6409
Expand Down

0 comments on commit 574ff95

Please sign in to comment.