Skip to content

Commit

Permalink
buffer header writes. fallocate is a future
Browse files Browse the repository at this point in the history
  • Loading branch information
bchess committed Apr 26, 2024
1 parent 03af4b8 commit 4da5cb8
Showing 1 changed file with 55 additions and 35 deletions.
90 changes: 55 additions & 35 deletions tensorizer/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3813,9 +3813,11 @@ def tensor_memoryview(self) -> memoryview:
def set_min_file_version_number(self, version_number):
self.min_file_version = max(self.min_file_version, version_number)

def _maybe_fallocate(self, tensors: Sequence[_WriteSpec]):
def _maybe_fallocate(
self, tensors: Sequence[_WriteSpec]
) -> Optional[concurrent.futures.Future]:
if not _syscalls.has_fallocate() or not self._fd:
return
return None

next_pos = self._file.tell()
size = sum(len(t.name.serialized_()) for t in tensors)
Expand All @@ -3828,16 +3830,21 @@ def _maybe_fallocate(self, tensors: Sequence[_WriteSpec]):
# Rough underestimate of header size
header_min_size = 24
size += header_min_size * len(tensors)
_syscalls.try_fallocate(
self._fd, next_pos, size, suppress_all_errors=True

return self._header_writer_pool.submit(
_syscalls.try_fallocate,
self._fd,
next_pos,
size,
suppress_all_errors=True,
)

def _bulk_write(self, write_specs: Iterable[_WriteSpec], incremental=False):
write_specs = list(write_specs)

write_dependency: Optional[concurrent.futures.Future] = None
if not incremental:
# TODO: make into a future
self._maybe_fallocate(write_specs)
write_dependency = self._maybe_fallocate(write_specs)

for w in write_specs:
self._path_registry.register_path(w.name)
Expand All @@ -3854,6 +3861,9 @@ def _bulk_write(self, write_specs: Iterable[_WriteSpec], incremental=False):

if self._encrypted:
self._do_encryption(write_specs)
if write_dependency:
write_dependency.result(_TIMEOUT)

self._do_commit_headers(write_specs)
self._do_commit_tensor_data(write_specs)
if self._encrypted:
Expand Down Expand Up @@ -4434,39 +4444,49 @@ def encrypt(write_spec, dependency: Optional[_Future]):
)
self._jobs.append(w.tensor_data_task)

def _do_commit_headers(self, write_specs: Sequence[_WriteSpec]) -> None:
# TODO: this is lots of tiny writes. Buffer them for performance
def commit_header(write_spec, dependency: _Future):
if dependency is not None:
dependency.result(_TIMEOUT)
self._pwrite(
write_spec.header.metadata_entry,
write_spec.metadata_pos,
verify=len(write_spec.header.metadata_entry),
)
def _do_commit_headers(self, write_specs_: Sequence[_WriteSpec]) -> None:
def do_commit(
write_specs: Sequence[TensorSerializer._WriteSpec],
dependencies: Sequence[_Future],
):

header_block_size = self._header_cur - self._metadata_start
header_buffer = bytearray(header_block_size)

metadata_start = self._metadata_start
metadata_size = (
self._metadata_cur - metadata_start - 8
) # 8 bytes for metadata length field
struct.pack_into("<Q", header_buffer, 0, metadata_size)

_future_wait_and_raise(dependencies, _TIMEOUT)
for w in write_specs:
w_h = w.header
assert w_h is not None
# fmt: off
header_buffer[
w.metadata_pos - metadata_start :
w.metadata_pos + len(w_h.metadata_entry) - metadata_start
] = w_h.metadata_entry
header_buffer[
w_h.file_offset - metadata_start :
w_h.file_offset + w_h.size - metadata_start
] = w_h.buffer
# fmt: on

self._pwrite(
write_spec.header.buffer,
write_spec.header.file_offset,
verify=write_spec.header.size,
header_buffer, metadata_start, verify=header_block_size
)

metadata_size = (
self._metadata_cur - self._metadata_start - 8
) # 8 bytes for metadata length field
metadata_size_task = self._header_writer_pool.submit(
self._pwrite,
struct.pack("<Q", metadata_size),
self._metadata_start,
verify=8,
deps = [
w.tensor_data_task
for w in write_specs_
if w.tensor_data_task is not None
]
commit_header_task = self._header_writer_pool.submit(
do_commit, list(write_specs_), deps
)
self._jobs.append(metadata_size_task)

for w in write_specs:
commit_header_task = self._header_writer_pool.submit(
commit_header, w, w.tensor_data_task
)
# Note this does _not_ set w.tensor_data_task, as committing headers is safe
self._jobs.append(commit_header_task)
self._jobs.append(commit_header_task)

def _do_commit_tensor_data(self, write_specs: Sequence[_WriteSpec]):
def commit_tensor_data(
Expand Down

0 comments on commit 4da5cb8

Please sign in to comment.