Skip to content

Commit

Permalink
Properly lock I/O on write operations to avoid issues on concurrent w…
Browse files Browse the repository at this point in the history
…rites
  • Loading branch information
nathanhi committed Mar 2, 2024
1 parent 4273a9e commit 941a42a
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 60 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Fixed

* `#34 <https://github.com/nathanhi/pyfatfs/issues/34>`_ (DosDateTime) `PR #35 <https://github.com/nathanhi/pyfatfs/pull/35>`_: Gracefully handle invalid file timestamps by `@beckerben <https://github.com/beckerben>`_
* `#31 <https://github.com/nathanhi/pyfatfs/issues/31>`_ (FATDirectoryEntry): Handle file sizes larger than 4GB gracefully by responding with ``PyFATException`` and ``errno=E2BIG``
* Properly lock I/O on write operations to avoid issues on concurrent writes

Changed
~~~~~~~
Expand Down
127 changes: 67 additions & 60 deletions pyfatfs/FatIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def __init__(self, fs: PyFat,
self.mode = mode
self.fs = fs
self.name = str(path)
# TODO: File locking
self._lock = threading.Lock()

self.dir_entry = self.fs.root_dir.get_entry(path)
Expand Down Expand Up @@ -129,37 +128,38 @@ def read(self, size: int = -1) -> Union[bytes, None]:
if not self.mode.reading:
raise IOError("File not open for reading")

# Set size boundary
if size + self.__bpos > self.dir_entry.filesize or size < 0:
size = self.dir_entry.filesize - self.__bpos

if size == 0:
return b""

chunks = []
read_bytes = 0
cluster_offset = self.__coffpos
for c in self.fs.get_cluster_chain(self.__cpos):
chunk_size = self.fs.bytes_per_cluster - cluster_offset
# Do not read past EOF
if read_bytes + chunk_size > size:
chunk_size = size - read_bytes

chunk = self.fs.read_cluster_contents(c)
chunk = chunk[cluster_offset:][:chunk_size]
cluster_offset = 0
chunks.append(chunk)
read_bytes += chunk_size
if read_bytes == size:
break

self.seek(read_bytes, 1)

chunks = b"".join(chunks)
if len(chunks) != size:
raise RuntimeError("Read a different amount of data "
"than was requested.")
return chunks
with self._lock:
# Set size boundary
if size + self.__bpos > self.dir_entry.filesize or size < 0:
size = self.dir_entry.filesize - self.__bpos

if size == 0:
return b""

chunks = []
read_bytes = 0
cluster_offset = self.__coffpos
for c in self.fs.get_cluster_chain(self.__cpos):
chunk_size = self.fs.bytes_per_cluster - cluster_offset
# Do not read past EOF
if read_bytes + chunk_size > size:
chunk_size = size - read_bytes

chunk = self.fs.read_cluster_contents(c)
chunk = chunk[cluster_offset:][:chunk_size]
cluster_offset = 0
chunks.append(chunk)
read_bytes += chunk_size
if read_bytes == size:
break

self.seek(read_bytes, 1)

chunks = b"".join(chunks)
if len(chunks) != size:
raise RuntimeError("Read a different amount of data "
"than was requested.")
return chunks

def readinto(self, __buffer: bytearray) -> Optional[int]:
"""Read data "directly" into bytearray."""
Expand All @@ -175,6 +175,11 @@ def writable(self) -> bool:
return False

def write(self, __b: Union[bytes, bytearray]) -> Optional[int]:
"""Write given bytes to file."""
with self._lock:
return self.__write(__b)

def __write(self, __b: Union[bytes, bytearray]) -> Optional[int]:
"""Write given bytes to file."""
if not self.writable():
raise IOError('Cannot write to read-only file!')
Expand Down Expand Up @@ -212,31 +217,33 @@ def truncate(self, size: Optional[int] = 0) -> int:
:param size: `int`: Size to truncate to, defaults to 0.
:returns: `int`: Truncated size
"""
cur_pos = self.tell()
size = size if size is not None else cur_pos
if size > self.dir_entry.MAX_FILE_SIZE:
raise PyFATException(f"Unable to truncate file to {size} bytes "
f"as it would exceed FAT file size "
f"limitations.",
errno=errno.E2BIG)

if size > self.dir_entry.get_size():
self.seek(0, 2)
self.write(b'\0' * (size - self.dir_entry.get_size()))
self.seek(cur_pos)
elif size < self.dir_entry.get_size():
# Always keep at least one cluster allocated
num_clusters = max(1, self.fs.calc_num_clusters(size))
i = 0
for c in self.fs.get_cluster_chain(self.dir_entry.get_cluster()):
i += 1
if i <= num_clusters:
continue
self.fs.free_cluster_chain(c)
self.fs.flush_fat()
break

# Update file size
self.dir_entry.filesize = size
self.fs.update_directory_entry(self.dir_entry.get_parent_dir())
return size
with self._lock:
cur_pos = self.tell()
size = size if size is not None else cur_pos
if size > self.dir_entry.MAX_FILE_SIZE:
raise PyFATException(f"Unable to truncate file to {size} "
f"bytes as it would exceed FAT file "
f"size limitations.",
errno=errno.E2BIG)

if size > self.dir_entry.get_size():
self.seek(0, 2)
self.__write(b'\0' * (size - self.dir_entry.get_size()))
self.seek(cur_pos)
elif size < self.dir_entry.get_size():
# Always keep at least one cluster allocated
num_clusters = max(1, self.fs.calc_num_clusters(size))
i = 0
for c in self.fs.get_cluster_chain(
self.dir_entry.get_cluster()):
i += 1
if i <= num_clusters:
continue
self.fs.free_cluster_chain(c)
self.fs.flush_fat()
break

# Update file size
self.dir_entry.filesize = size
self.fs.update_directory_entry(self.dir_entry.get_parent_dir())
return size
79 changes: 79 additions & 0 deletions tests/test_PyFatFS.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,85 @@ def make_fs(self): # pylint: disable=R0201
"""Create filesystem for PyFilesystem2 integration tests."""
return _make_fs(self.FAT_TYPE)[0]

def test_write_lock(self):
"""Verify concurrent writes to files are processed sequentially."""
from threading import Thread
threads = []
self.fs.create("/WRITE.TXT")

def write_to_file(_f, _i):
_f.write(str(_i) * 10 + "\n")

f = self.fs.open("/WRITE.TXT", "w")
for i in range(0, 10):
t = Thread(target=write_to_file, args=(f, i))
t.start()
threads.append(t)

for t in threads:
t.join()
f.close()

read_text = self.fs.readtext("/WRITE.TXT")
for i in range(0, 10):
self.assertIn(str(i) * 10 + "\n", read_text)

def test_append_lock(self):
"""Verify concurrent appends to files are processed sequentially."""
from threading import Thread
threads = []
self.fs.create("/APPEND.TXT")

def append_to_file(_fs, _i):
_fs.appendtext("/APPEND.TXT", str(_i) * 10 + "\n")

for i in range(0, 10):
t = Thread(target=append_to_file, args=(self.fs, i))
t.start()
threads.append(t)

for t in threads:
t.join()

read_text = self.fs.readtext("/APPEND.TXT")
for i in range(0, 10):
self.assertIn(str(i) * 10 + "\n", read_text)

def test_fs_lock(self):
"""Check for race conditions on concurrent filesystem operations."""
fs, in_memory_fs = _make_fs(self.FAT_TYPE, lazy_load=True)
threads = []

def create_dentries(fs, i):
for n in range(0, 50):
fs.makedirs(f"/root/{n}DIR", recreate=True)
fs.touch(f"/root/{n}.dat")
fs.touch(f"/root/{n}DIR/{n}.dat")
fs.touch(f"/root/{i}.txt")

from threading import Thread
for i in range(0, 10):
t = Thread(target=create_dentries, args=(fs, i))
t.start()
threads.append(t)

for t in threads:
t.join()
in_memory_fs.seek(0)
fs = PyFatBytesIOFS(BytesIO(in_memory_fs.read()),
encoding='UTF-8', lazy_load=True)
expected_dentries_root = []
expected_dentries_sub = []
for i in range(0, 50):
expected_dentries_root.append(f"{i}DIR")
expected_dentries_root.append(f"{i}.dat")
expected_dentries_sub.append(f"{i}DIR/{i}.dat")
for i in range(0, 10):
expected_dentries_root.append(f"{i}.txt")
assert fs.listdir("/root").sort() == expected_dentries_root.sort()
for i in range(0, 10):
assert fs.listdir(f"/root/{i}DIR").sort() == expected_dentries_sub.sort()

def test_lazy_load_dentry_parent_update(self):
"""#33: Verify parent dentry is properly set on lazy-load."""
fs, in_memory_fs = _make_fs(self.FAT_TYPE, lazy_load=True)
Expand Down

0 comments on commit 941a42a

Please sign in to comment.