Skip to content

Commit

Permalink
implement multiprocessing to circumvent Python GIL
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Kanzer authored and Aaron Kanzer committed Nov 22, 2024
1 parent df7bcd3 commit b59ca09
Showing 1 changed file with 42 additions and 39 deletions.
81 changes: 42 additions & 39 deletions linc_convert/modalities/lsm/mosaic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import re
from glob import glob
from typing import Literal
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import Pool
from functools import partial

# externals
import cyclopts
Expand All @@ -31,8 +32,8 @@
mosaic = cyclopts.App(name="mosaic", help_format="markdown")
lsm.command(mosaic)

def write_plane(tswriter, subc, zstart, subz, ystart, yx_shape, dat):
"""Write a single plane of data into the Zarr file."""
def write_plane_multiprocess(tswriter, subc, zstart, subz, ystart, yx_shape, dat):
"""Write a single plane of data into the Zarr file (multiprocessing)."""

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN201)

linc_convert/modalities/lsm/mosaic.py:36:5: ANN201 Missing return type annotation for public function `write_plane_multiprocess`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:30: ANN001 Missing type annotation for function argument `tswriter`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:40: ANN001 Missing type annotation for function argument `subc`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:46: ANN001 Missing type annotation for function argument `zstart`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:54: ANN001 Missing type annotation for function argument `subz`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:60: ANN001 Missing type annotation for function argument `ystart`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:68: ANN001 Missing type annotation for function argument `yx_shape`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:78: ANN001 Missing type annotation for function argument `dat`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN201)

linc_convert/modalities/lsm/mosaic.py:36:5: ANN201 Missing return type annotation for public function `write_plane_multiprocess`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:30: ANN001 Missing type annotation for function argument `tswriter`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:40: ANN001 Missing type annotation for function argument `subc`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:46: ANN001 Missing type annotation for function argument `zstart`

Check failure on line 36 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:36:54: ANN001 Missing type annotation for function argument `subz`
try:
with ts.Transaction() as txn:
tswriter.with_transaction(txn)[
Expand All @@ -45,6 +46,16 @@ def write_plane(tswriter, subc, zstart, subz, ystart, yx_shape, dat):
print(f"Error writing plane: {e}")
raise

def monitor_and_wait(futures):
"""Monitor resource usage and wait for completion of a batch of futures."""
print("\nProcessing batch...")

Check failure on line 51 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN201)

linc_convert/modalities/lsm/mosaic.py:51:5: ANN201 Missing return type annotation for public function `monitor_and_wait`

Check failure on line 51 in linc_convert/modalities/lsm/mosaic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

linc_convert/modalities/lsm/mosaic.py:51:22: ANN001 Missing type annotation for function argument `futures`
for future in as_completed(futures):
try:
future.result() # Raise any exceptions from the task
except Exception as e:
print(f"Error in parallel write: {e}")
futures.clear()

@mosaic.default
def convert(
inp: str,
Expand Down Expand Up @@ -262,42 +273,34 @@ def convert(

tswriter = ts.open(wconfig).result()

with ThreadPoolExecutor() as executor:
futures = []

for i, dirname in enumerate(all_chunks_info["dirname"]):
chunkz = all_chunks_info["z"][i] - 1
chunky = all_chunks_info["y"][i] - 1
planes = all_chunks_info["planes"][i]

for j, fname in enumerate(planes["fname"]):
subz = planes["z"][j] - 1
subc = planes["c"][j] - 1
yx_shape = planes["yx_shape"][j]

zstart = sum(shape[0][0] for shape in allshapes[:chunkz])
ystart = sum(
shape[1] for subshapes in allshapes for shape in subshapes[:chunky]
)

print(
f"Queueing write plane ({subc:4d}, {zstart + subz:4d}, "
f"{ystart:4d}:{ystart + yx_shape[0]:4d})",
end="\r",
)

# Load data and submit the write task
dat = TiffFile(fname).asarray()
futures.append(
executor.submit(write_plane, tswriter, subc, zstart, subz, ystart, yx_shape, dat)
)

# Wait for all tasks to complete
for future in as_completed(futures):
try:
future.result() # Raise any exceptions from the task
except Exception as e:
print(f"Error in parallel write: {e}")
tasks = []
for i, dirname in enumerate(all_chunks_info["dirname"]):
chunkz = all_chunks_info["z"][i] - 1
chunky = all_chunks_info["y"][i] - 1
planes = all_chunks_info["planes"][i]

for j, fname in enumerate(planes["fname"]):
subz = planes["z"][j] - 1
subc = planes["c"][j] - 1
yx_shape = planes["yx_shape"][j]

zstart = sum(shape[0][0] for shape in allshapes[:chunkz])
ystart = sum(
shape[1] for subshapes in allshapes for shape in subshapes[:chunky]
)

print(
f"Queueing write plane ({subc:4d}, {zstart + subz:4d}, "
f"{ystart:4d}:{ystart + yx_shape[0]:4d})",
end="\r",
)

dat = TiffFile(fname).asarray()
tasks.append((subc, zstart, subz, ystart, yx_shape, dat))

write_func = partial(write_plane_multiprocess, tswriter)
with Pool(processes=8) as pool:
pool.starmap(write_func, tasks)

print("")

Expand Down

0 comments on commit b59ca09

Please sign in to comment.