Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZarrWriter speedup #195

Merged
merged 25 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
99fae7b
use one array for all images
Axel-Jacobsen Dec 15, 2022
938384c
update docs
Axel-Jacobsen Dec 15, 2022
213a249
use constants for zarr array dimensions
Axel-Jacobsen Dec 15, 2022
c2ec078
check for exceptions when closing zarrwriter
Axel-Jacobsen Dec 15, 2022
81fb8de
TIL list appends are atomic
Axel-Jacobsen Dec 15, 2022
36dd141
autoformat
Axel-Jacobsen Dec 15, 2022
c40899f
logger.error instead of logger
Axel-Jacobsen Dec 15, 2022
92b55b1
w is overwrite
Axel-Jacobsen Dec 15, 2022
8f68471
clean up a little bit
Axel-Jacobsen Dec 15, 2022
2771a58
Merge branch 'zarrwriter-xtra-big' of https://github.com/czbiohub/ulc…
Axel-Jacobsen Dec 15, 2022
1842ad9
sketch out a chunkwriter class
Axel-Jacobsen Dec 16, 2022
166712a
remove ChunkWriter from zarrwriter
Axel-Jacobsen Dec 16, 2022
d53e60f
Merge branch 'master' into zarrwriter-xtra-big
Axel-Jacobsen Dec 17, 2022
c23e4ce
autoformat
Axel-Jacobsen Dec 17, 2022
a14d71e
bugfixes
Axel-Jacobsen Dec 17, 2022
888acc6
bugfixes 2
Axel-Jacobsen Dec 17, 2022
1dc22a3
use image counter in dev_run.py
Dec 17, 2022
b3148ac
Merge branch 'master' into zarrwriter-xtra-big
Dec 19, 2022
09bf6de
maxowkresr = 1
Axel-Jacobsen Dec 19, 2022
e9ab4dc
if using a .zip filename, zarr needs it to be closed explicitly, othe…
Dec 20, 2022
9bd0125
don't use zipstore, use .zarr extension instead
Dec 21, 2022
c34d858
Merge branch 'master' into zarrwriter-xtra-big
Axel-Jacobsen Dec 21, 2022
1f50aab
Merge branch 'master' into zarrwriter-xtra-big
Axel-Jacobsen Dec 22, 2022
eb2e20d
switch back to zipstore
Dec 23, 2022
7a9863e
add back in data storage throttle for dev_run - even with these chang…
Dec 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ulc_mm_package/QtGUI/dev_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ def run(self):
while True:
if self.camera_activated:
try:
for image, timestamp in self.camera.yieldImages():
for i, (image, timestamp) in enumerate(self.camera.yieldImages()):
self.updateGUIElements()
self.save(image)
self.save(image, i)
self.zStack(image)
self.activeFlowControl(image, timestamp)
self._autobrightness(image)
Expand Down Expand Up @@ -192,7 +192,7 @@ def getMetadata(self) -> Dict:
"focus_adjustment": self.af_adjustment_done,
}

def save(self, image):
def save(self, image, idx: int):
if self.single_save:
filename = (
path.join(self.main_dir, datetime.now().strftime(DATETIME_FORMAT))
Expand All @@ -202,7 +202,7 @@ def save(self, image):
self.single_save = False

if self.continuous_save:
self.data_storage.writeData(image, self.getMetadata())
self.data_storage.writeData(image, self.getMetadata(), idx)
self.measurementTime.emit(int(perf_counter() - self.start_time))

def updateGUIElements(self):
Expand Down
5 changes: 0 additions & 5 deletions ulc_mm_package/QtGUI/gui_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ class STATUS(enum.Enum):
# TH sensor update period
TH_PERIOD = 5

# ================ Experiment timeout ================ #
MAX_FRAMES = 20000 # Rounded up from 10 minutes of data at 30 FPS
if SIMULATION:
MAX_FRAMES = 2000

# ================ Media/links ================ #
ICON_PATH = "gui_images/CZB-logo.png"

Expand Down
7 changes: 2 additions & 5 deletions ulc_mm_package/QtGUI/liveview_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@
from PyQt5.QtCore import Qt, pyqtSlot
from PyQt5.QtGui import QPixmap, QIcon

from ulc_mm_package.scope_constants import MAX_FRAMES
from ulc_mm_package.image_processing.processing_constants import TOP_PERC_TARGET_VAL
from ulc_mm_package.QtGUI.gui_constants import (
STATUS,
ICON_PATH,
MAX_FRAMES,
)
from ulc_mm_package.QtGUI.gui_constants import STATUS, ICON_PATH
from ulc_mm_package.neural_nets.YOGOInference import ClassCountResult


Expand Down
5 changes: 2 additions & 3 deletions ulc_mm_package/QtGUI/scope_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from ulc_mm_package.hardware.scope import MalariaScope
from ulc_mm_package.hardware.scope_routines import *

from ulc_mm_package.scope_constants import PER_IMAGE_METADATA_KEYS
from ulc_mm_package.scope_constants import PER_IMAGE_METADATA_KEYS, MAX_FRAMES
from ulc_mm_package.hardware.hardware_modules import PressureSensorStaleValue
from ulc_mm_package.hardware.hardware_constants import SIMULATION, DATETIME_FORMAT
from ulc_mm_package.neural_nets.NCSModel import AsyncInferenceResult
Expand All @@ -26,7 +26,6 @@
from ulc_mm_package.QtGUI.gui_constants import (
ACQUISITION_PERIOD,
LIVEVIEW_PERIOD,
MAX_FRAMES,
STATUS,
TH_PERIOD,
)
Expand Down Expand Up @@ -579,7 +578,7 @@ def run_experiment(self, img, timestamp):
self.img_metadata["humidity"] = None
self.img_metadata["temperature"] = None

self.mscope.data_storage.writeData(img, self.img_metadata)
self.mscope.data_storage.writeData(img, self.img_metadata, self.count)
self.count += 1

if self.running:
Expand Down
4 changes: 2 additions & 2 deletions ulc_mm_package/hardware/routine_test_harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def main_acquisition_loop(mscope: MalariaScope):
cell_density = cell_density_routine(None)
cell_density.send(None)

for img, timestamp in mscope.camera.yieldImages():
for i, (img, timestamp) in enumerate(mscope.camera.yieldImages()):
# Display
_displayImage(img)

Expand Down Expand Up @@ -248,7 +248,7 @@ def main_acquisition_loop(mscope: MalariaScope):
fake_per_img_metadata["flowrate"] = flow_val
fake_per_img_metadata["temperature"] = mscope.ht_sensor.getTemperature()
fake_per_img_metadata["humidity"] = mscope.ht_sensor.getRelativeHumidity()
mscope.data_storage.writeData(img, fake_per_img_metadata)
mscope.data_storage.writeData(img, fake_per_img_metadata, i)

# Timed stop condition
if perf_counter() - start > stop_time_s:
Expand Down
4 changes: 2 additions & 2 deletions ulc_mm_package/image_processing/data_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def createNewExperiment(
)
self.zw.createNewFile(filename)

def writeData(self, image: np.ndarray, metadata: Dict):
def writeData(self, image: np.ndarray, metadata: Dict, count: int):
"""Write a new image and its corresponding metadata.

Parameters
Expand All @@ -131,7 +131,7 @@ def writeData(self, image: np.ndarray, metadata: Dict):

if self.zw.writable and perf_counter() - self.prev_write_time > self.dt:
self.prev_write_time = perf_counter()
self.zw.threadedWriteSingleArray(image)
self.zw.threadedWriteSingleArray(image, count)
self.md_writer.writerow(metadata)

def writeSingleImage(self, image: np.ndarray, custom_image_name: str):
Expand Down
102 changes: 55 additions & 47 deletions ulc_mm_package/image_processing/zarrwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@

"""

import functools
import threading
from time import perf_counter
import logging
from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait

import zarr
import logging
import threading
import functools
import threading

from time import perf_counter
from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait
from typing import List
from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, Future, wait

from ulc_mm_package.utilities.lock_utils import lock_no_block
from ulc_mm_package.scope_constants import CameraOptions, CAMERA_SELECTION, MAX_FRAMES


WRITE_LOCK = threading.Lock()
Expand All @@ -41,16 +40,14 @@ def __str__(self):

# ==================== Main class ===============================
class ZarrWriter:
def __init__(self):
self.store = None
self.group = None
self.arr_counter = 0
self.compressor = None
def __init__(self, camera_selection: CameraOptions = CAMERA_SELECTION):
self.writable = False
self.prev_write_time = 0
self.futures = []
self.executor = ThreadPoolExecutor(max_workers=1)
self.futures: List[Future] = []
self.futures_lock = threading.Lock()
self.logger = logging.getLogger(__name__)
self.executor = ThreadPoolExecutor(max_workers=16)

self.camera_selection: CameraOptions = camera_selection

def createNewFile(self, filename: str, overwrite: bool = False):
"""Create a new zarr file.
Expand All @@ -62,61 +59,77 @@ def createNewFile(self, filename: str, overwrite: bool = False):
overwrite : bool
Will overwrite a file with the existing filename if it exists, otherwise will append.
"""

try:
filename = f"{filename}.zip"
if overwrite:
self.store = zarr.ZipStore(filename, mode="x")
else:
self.store = zarr.ZipStore(filename, mode="w")
self.group = zarr.group(store=self.store)
self.arr_counter = 0
self.array = zarr.open(
filename,
"x" if overwrite else "w",
shape=(
self.camera_selection.IMG_HEIGHT,
self.camera_selection.IMG_WIDTH,
MAX_FRAMES,
),
chunks=(
self.camera_selection.IMG_HEIGHT,
self.camera_selection.IMG_WIDTH,
1,
),
compressor=None,
dtype="u1",
)
self.writable = True
except AttributeError as e:
self.logger.error(
f"zarrwriter.py : createNewFile : Exception encountered - {e}"
)
raise IOError(f"Error creating {filename}.zip")

@lock_no_block(WRITE_LOCK, WriteInProgress)
def writeSingleArray(self, data) -> int:
def writeSingleArray(self, data, pos: int) -> None:
"""Write a single array and optional metadata to the Zarr store.

Parameters
----------
data : np.ndarray
metadata : dict
A dictionary of keys to values to be associated with the given data.
data : np.ndarray - the image to write
pos: int - the index of the zarr array to write to

Returns
-------
int:
arr_count (id)
Since each `pos` is a different chunk, it is threadsafe - see
https://zarr.readthedocs.io/en/stable/tutorial.html#parallel-computing-and-synchronization
"""
if not self.writable:
return

try:
self.prev_write_time = perf_counter()
self.group.array(
f"{self.arr_counter}", data=data, compressor=self.compressor
)
self.arr_counter += 1
return self.arr_counter
self.array[:, :, pos] = data
except Exception as e:
self.logger.error(
f"zarrwriter.py : writeSingleArray : Exception encountered - {e}"
)
raise AttemptingWriteWithoutFile()

def threadedWriteSingleArray(self, *args, **kwargs):
self.futures.append(self.executor.submit(self.writeSingleArray, *args))
f = self.executor.submit(self.writeSingleArray, *args)
with lock_timeout(self.futures_lock):
self.futures.append(f)

def wait_all(self):
wait(self.futures, return_when=ALL_COMPLETED)

@lock_no_block(WRITE_LOCK, WriteInProgress)
def closeFile(self):
"""Close the Zarr store."""
self.writable = False
wait(self.futures, return_when=ALL_COMPLETED)
self.store.close()
self.store = None
self.wait_all()

exceptions = []
for f in self.futures:
if f.exception is not None:
exceptions.append(f.exception)

for i, exc in enumerate(exceptions):
self.logger.error(f"exception in zarrwriter: {exc}")
if i > 10:
self.logger(f"{len(exceptions) - i} exceptions left; {len(exceptions)} total")
break

self.futures = []

def threadedCloseFile(self):
Expand All @@ -130,8 +143,3 @@ def threadedCloseFile(self):
future: An object that can be polled to check if closing the file has completed
"""
return self.executor.submit(self.closeFile)

def __del__(self):
# If the user did not manually close the storage, close it
if self.store != None:
self.store.close()
6 changes: 6 additions & 0 deletions ulc_mm_package/scope_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def img_dims(self) -> ImageDims:
return ImageDims(height=600, width=800)
elif self == CameraOptions.SIMULATED:
return ImageDims(height=600, width=800)
raise ValueError("this is impossible because this class is an enum")

@property
def IMG_WIDTH(self) -> int:
Expand Down Expand Up @@ -79,6 +80,11 @@ def IMG_HEIGHT(self) -> int:
else:
SSD_DIR = "/media/pi/"

# ================ Experiment timeout ================ #
MAX_FRAMES = 20000 # Rounded up from 10 minutes of data at 30 FPS
if SIMULATION:
MAX_FRAMES = 2000

# ================ Camera constants ================ #
AVT_VENDOR_ID = 0x1AB2
AVT_PRODUCT_ID = 0x0001
Expand Down