Skip to content

Commit

Permalink
buffer header writes. fallocate is a future.
Browse files Browse the repository at this point in the history
Buffering headers won't work in incremental mode, so preserve the old
path
  • Loading branch information
bchess committed Apr 26, 2024
1 parent 5631c37 commit 613e555
Showing 1 changed file with 45 additions and 10 deletions.
55 changes: 45 additions & 10 deletions tensorizer/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -4449,7 +4449,7 @@ def do_commit(
write_specs: Sequence[TensorSerializer._WriteSpec],
dependencies: Sequence[_Future],
):

# Fast version: makes one buffer containing the size, metadata, and headers, and writes it one go
header_block_size = self._header_cur - self._metadata_start
header_buffer = bytearray(header_block_size)

Expand Down Expand Up @@ -4478,15 +4478,50 @@ def do_commit(
header_buffer, metadata_start, verify=header_block_size
)

deps = [
w.tensor_data_task
for w in write_specs_
if w.tensor_data_task is not None
]
commit_header_task = self._header_writer_pool.submit(
do_commit, list(write_specs_), deps
)
self._jobs.append(commit_header_task)
def do_commit_incremental(write_spec, dependency: _Future):
# Slow version: issues one write for each metadata and one write for each header
if dependency is not None:
dependency.result(_TIMEOUT)
self._pwrite(
write_spec.header.metadata_entry,
write_spec.metadata_pos,
verify=len(write_spec.header.metadata_entry),
)
self._pwrite(
write_spec.header.buffer,
write_spec.header.file_offset,
verify=write_spec.header.size,
)

if write_specs_[0].metadata_pos == self._metadata_start + 8:
deps = [
w.tensor_data_task
for w in write_specs_
if w.tensor_data_task is not None
]
commit_header_task = self._header_writer_pool.submit(
do_commit, list(write_specs_), deps
)

self._jobs.append(commit_header_task)
else:
# We've already written headers (we're in incremental mode)
# So we can't just batch up and write all of them at once
for w in write_specs_:
# Note this does _not_ set w.tensor_data_task, as committing headers is safe
self._header_writer_pool.submit(
do_commit_incremental, w, w.tensor_data_task
)
metadata_size = (
self._metadata_cur - self._metadata_start - 8
) # 8 bytes for metadata length field
metadata_size_task = self._header_writer_pool.submit(
self._pwrite,
struct.pack("<Q", metadata_size),
self._metadata_start,
verify=8,
)
self._jobs.append(metadata_size_task)

def _do_commit_tensor_data(self, write_specs: Sequence[_WriteSpec]):
def commit_tensor_data(
Expand Down

0 comments on commit 613e555

Please sign in to comment.