diff --git a/ulc_mm_package/QtGUI/dev_run.py b/ulc_mm_package/QtGUI/dev_run.py index 905be8538..b98da05c3 100644 --- a/ulc_mm_package/QtGUI/dev_run.py +++ b/ulc_mm_package/QtGUI/dev_run.py @@ -84,6 +84,7 @@ def _initializeAttributes(self, external_dir: str, mscope: MalariaScope): in this function to make the __init__ more readable.""" # Flags and counters + self.im_counter = 0 self.update_liveview = 1 self.update_counter = 0 self.num_loops = 50 @@ -182,7 +183,7 @@ def getMetadata(self) -> Dict: pressure_sensor_status = -1 return { - "im_counter": self.data_storage.zw.arr_counter, + "im_counter": self.im_counter, "measurement_type": "placeholder", "sample_type": "placeholder", "timestamp": datetime.now().strftime("%Y-%m-%d-%H%M%S_%f"), @@ -207,8 +208,9 @@ def save(self, image): self.single_save = False if self.continuous_save: - self.data_storage.writeData(image, self.getMetadata()) + self.data_storage.writeData(image, self.getMetadata(), self.im_counter) self.measurementTime.emit(int(perf_counter() - self.start_time)) + self.im_counter += 1 def updateGUIElements(self): self.update_counter += 1 @@ -262,6 +264,7 @@ def takeImage(self): if self.main_dir == None: self.main_dir = self.data_storage.main_dir + self.im_counter = 0 self.start_time = perf_counter() def changeBinningMode(self): @@ -668,7 +671,7 @@ def btnSnapHandler(self): ) end_time = perf_counter() start_time = self.acquisitionThread.start_time - num_images = self.acquisitionThread.data_storage.zw.arr_counter + num_images = self.acquisitionThread.im_counter print( f"{num_images} images taken in {end_time - start_time:.2f}s ({num_images / (end_time-start_time):.2f} fps)" ) diff --git a/ulc_mm_package/QtGUI/gui_constants.py b/ulc_mm_package/QtGUI/gui_constants.py index a5e3a73af..3e1955d63 100644 --- a/ulc_mm_package/QtGUI/gui_constants.py +++ b/ulc_mm_package/QtGUI/gui_constants.py @@ -23,11 +23,6 @@ class STATUS(enum.Enum): # TH sensor update period TH_PERIOD = 5 -# ================ Experiment timeout ================ # -MAX_FRAMES = 20000 # Rounded up from 10 minutes of data at 30 FPS -if SIMULATION: - MAX_FRAMES = 2000 - # ================ Media/links ================ # ICON_PATH = "gui_images/CZB-logo.png" diff --git a/ulc_mm_package/QtGUI/liveview_gui.py b/ulc_mm_package/QtGUI/liveview_gui.py index e7037f936..8a50b06ca 100644 --- a/ulc_mm_package/QtGUI/liveview_gui.py +++ b/ulc_mm_package/QtGUI/liveview_gui.py @@ -25,12 +25,9 @@ from PyQt5.QtCore import Qt, pyqtSlot from PyQt5.QtGui import QPixmap, QIcon +from ulc_mm_package.scope_constants import MAX_FRAMES from ulc_mm_package.image_processing.processing_constants import TOP_PERC_TARGET_VAL -from ulc_mm_package.QtGUI.gui_constants import ( - STATUS, - ICON_PATH, - MAX_FRAMES, -) +from ulc_mm_package.QtGUI.gui_constants import STATUS, ICON_PATH from ulc_mm_package.neural_nets.YOGOInference import ClassCountResult diff --git a/ulc_mm_package/QtGUI/scope_op.py b/ulc_mm_package/QtGUI/scope_op.py index 216f23957..d91587edc 100644 --- a/ulc_mm_package/QtGUI/scope_op.py +++ b/ulc_mm_package/QtGUI/scope_op.py @@ -17,7 +17,11 @@ from ulc_mm_package.hardware.scope_routines import * from ulc_mm_package.QtGUI.acquisition import Acquisition -from ulc_mm_package.scope_constants import PER_IMAGE_METADATA_KEYS, SIMULATION +from ulc_mm_package.scope_constants import ( + PER_IMAGE_METADATA_KEYS, + SIMULATION, + MAX_FRAMES, +) from ulc_mm_package.hardware.hardware_modules import PressureSensorStaleValue from ulc_mm_package.hardware.hardware_constants import DATETIME_FORMAT from ulc_mm_package.neural_nets.NCSModel import AsyncInferenceResult @@ -26,7 +30,6 @@ from ulc_mm_package.QtGUI.gui_constants import ( ACQUISITION_PERIOD, LIVEVIEW_PERIOD, - MAX_FRAMES, STATUS, TH_PERIOD, ) @@ -634,7 +637,7 @@ def run_experiment(self, img, timestamp): self.img_metadata["humidity"] = None self.img_metadata["temperature"] = None - self.mscope.data_storage.writeData(img, self.img_metadata) + self.mscope.data_storage.writeData(img, self.img_metadata, self.count) self.count += 1 qsize = self.mscope.data_storage.zw.executor._work_queue.qsize() diff --git a/ulc_mm_package/hardware/routine_test_harness.py b/ulc_mm_package/hardware/routine_test_harness.py index 79d8a182f..dabdd5e42 100644 --- a/ulc_mm_package/hardware/routine_test_harness.py +++ b/ulc_mm_package/hardware/routine_test_harness.py @@ -218,7 +218,7 @@ def main_acquisition_loop(mscope: MalariaScope): cell_density = cell_density_routine(None) cell_density.send(None) - for img, timestamp in mscope.camera.yieldImages(): + for i, (img, timestamp) in enumerate(mscope.camera.yieldImages()): # Display _displayImage(img) @@ -255,7 +255,7 @@ def main_acquisition_loop(mscope: MalariaScope): fake_per_img_metadata["flowrate"] = flow_val fake_per_img_metadata["temperature"] = mscope.ht_sensor.getTemperature() fake_per_img_metadata["humidity"] = mscope.ht_sensor.getRelativeHumidity() - mscope.data_storage.writeData(img, fake_per_img_metadata) + mscope.data_storage.writeData(img, fake_per_img_metadata, i) # Timed stop condition if perf_counter() - start > stop_time_s: diff --git a/ulc_mm_package/image_processing/data_storage.py b/ulc_mm_package/image_processing/data_storage.py index 9ba3cd599..2a4e13976 100644 --- a/ulc_mm_package/image_processing/data_storage.py +++ b/ulc_mm_package/image_processing/data_storage.py @@ -116,7 +116,7 @@ def createNewExperiment( ) self.zw.createNewFile(filename) - def writeData(self, image: np.ndarray, metadata: Dict): + def writeData(self, image: np.ndarray, metadata: Dict, count: int): """Write a new image and its corresponding metadata. Parameters @@ -131,7 +131,7 @@ def writeData(self, image: np.ndarray, metadata: Dict): if self.zw.writable and perf_counter() - self.prev_write_time > self.dt: self.prev_write_time = perf_counter() - self.zw.threadedWriteSingleArray(image) + self.zw.threadedWriteSingleArray(image, count) self.md_writer.writerow(metadata) def writeSingleImage(self, image: np.ndarray, custom_image_name: str): @@ -214,7 +214,7 @@ def save_uniform_sample(self) -> None: A new subfolder is created in the same folder as the experiment and images are saved as .pngs. """ - num_files = len(self.zw.group) + num_files = self.zw.array.nchunks_initialized try: indices = self._unif_subsequence_distribution( max_val=num_files, @@ -234,7 +234,7 @@ def save_uniform_sample(self) -> None: return for idx in indices: - img = self.zw.group[idx][:] + img = self.zw.array[..., idx] filepath = path.join(sub_seq_path, f"{idx}.png") cv2.imwrite(filepath, img) diff --git a/ulc_mm_package/image_processing/zarrwriter.py b/ulc_mm_package/image_processing/zarrwriter.py index ef7208aaa..4ffc8093b 100644 --- a/ulc_mm_package/image_processing/zarrwriter.py +++ b/ulc_mm_package/image_processing/zarrwriter.py @@ -1,4 +1,4 @@ -f""" Simple Zarr storage format wrapper +""" Simple Zarr storage format wrapper -- Important Links -- Library Documentation: @@ -6,25 +6,21 @@ """ -import functools -import threading -from time import perf_counter -import logging -from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait - +import time import zarr +import logging +import threading +import functools import threading +import numpy as np from time import perf_counter -from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait +from typing import List, Tuple, Optional +from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, Future, wait -from ulc_mm_package.utilities.lock_utils import lock_no_block +from ulc_mm_package.scope_constants import CameraOptions, CAMERA_SELECTION, MAX_FRAMES -WRITE_LOCK = threading.Lock() - - -# ==================== Custom errors =============================== class AttemptingWriteWithoutFile(Exception): def __str__(self): return """ @@ -39,20 +35,16 @@ def __str__(self): return "Write in progress." -# ==================== Main class =============================== class ZarrWriter: - def __init__(self): - self.store = None - self.group = None - self.arr_counter = 0 - self.compressor = None + def __init__(self, camera_selection: CameraOptions = CAMERA_SELECTION): self.writable = False - self.prev_write_time = 0 - self.futures = [] - self.executor = ThreadPoolExecutor(max_workers=1) + self.futures: List[Future] = [] self.logger = logging.getLogger(__name__) + self.executor = ThreadPoolExecutor(max_workers=1) + + self.camera_selection: CameraOptions = camera_selection - def createNewFile(self, filename: str, overwrite: bool = False): + def createNewFile(self, filename: str, overwrite: bool = True): """Create a new zarr file. Parameters @@ -62,15 +54,26 @@ def createNewFile(self, filename: str, overwrite: bool = False): overwrite : bool Will overwrite a file with the existing filename if it exists, otherwise will append. """ - try: - filename = f"{filename}.zip" - if overwrite: - self.store = zarr.ZipStore(filename, mode="x") - else: - self.store = zarr.ZipStore(filename, mode="w") - self.group = zarr.group(store=self.store) - self.arr_counter = 0 + self.store = zarr.ZipStore( + f"{filename}.zip", + mode="w" if overwrite else "x", + ) + self.array = zarr.zeros( + shape=( + self.camera_selection.IMG_HEIGHT, + self.camera_selection.IMG_WIDTH, + MAX_FRAMES, + ), + chunks=( + self.camera_selection.IMG_HEIGHT, + self.camera_selection.IMG_WIDTH, + 1, + ), + compressor=None, + store=self.store, + dtype="u1", + ) self.writable = True except AttributeError as e: self.logger.error( @@ -78,49 +81,58 @@ def createNewFile(self, filename: str, overwrite: bool = False): ) raise IOError(f"Error creating {filename}.zip") - @lock_no_block(WRITE_LOCK, WriteInProgress) - def writeSingleArray(self, data) -> int: + def writeSingleArray(self, data, pos: int) -> None: """Write a single array and optional metadata to the Zarr store. Parameters ---------- - data : np.ndarray - metadata : dict - A dictionary of keys to values to be associated with the given data. + data : np.ndarray - the image to write + pos: int - the index of the zarr array to write to - Returns - ------- - int: - arr_count (id) + Since each `pos` is a different chunk, it is threadsafe - see + https://zarr.readthedocs.io/en/stable/tutorial.html#parallel-computing-and-synchronization """ + if not self.writable: + return try: - self.prev_write_time = perf_counter() - self.group.array( - f"{self.arr_counter}", data=data, compressor=self.compressor - ) - self.arr_counter += 1 - return self.arr_counter + self.array[:, :, pos] = data except Exception as e: self.logger.error( f"zarrwriter.py : writeSingleArray : Exception encountered - {e}" ) raise AttemptingWriteWithoutFile() - def threadedWriteSingleArray(self, *args, **kwargs): - self.futures.append(self.executor.submit(self.writeSingleArray, *args)) + def threadedWriteSingleArray(self, data, pos: int): + f = self.executor.submit(self.writeSingleArray, data, pos) + self.futures.append(f) + + def wait_all(self): + wait(self.futures, return_when=ALL_COMPLETED) - @lock_no_block(WRITE_LOCK, WriteInProgress) def closeFile(self): """Close the Zarr store.""" self.writable = False - wait(self.futures, return_when=ALL_COMPLETED) - self.store.close() - self.store = None + self.wait_all() + + exceptions = [] + for f in self.futures: + if f.exception() is not None: + exceptions.append(f.exception()) + + for i, exc in enumerate(exceptions): + self.logger.error(f"exception in zarrwriter: {exc}") + if i > 10: + self.logger.error( + f"{len(exceptions) - i} exceptions left; {len(exceptions)} total" + ) + break + self.futures = [] + self.store.close() def threadedCloseFile(self): - """Close the file in a separate thread (and locks the ability to write to the file). + """Close the file in a separate thread. This threaded close was written with UI.py in mind, so that the file can be closed while keeping the rest of the GUI responsive. @@ -130,8 +142,3 @@ def threadedCloseFile(self): future: An object that can be polled to check if closing the file has completed """ return self.executor.submit(self.closeFile) - - def __del__(self): - # If the user did not manually close the storage, close it - if self.store != None: - self.store.close() diff --git a/ulc_mm_package/scope_constants.py b/ulc_mm_package/scope_constants.py index 2a7cd67ba..ccfe966de 100644 --- a/ulc_mm_package/scope_constants.py +++ b/ulc_mm_package/scope_constants.py @@ -52,6 +52,7 @@ def img_dims(self) -> ImageDims: if "avt" in VIDEO_PATH: return ImageDims(height=772, width=1032) return ImageDims(height=600, width=800) + raise ValueError("this is impossible because this class is an enum") raise ValueError( f"CameraOptions somehow gained an enum type {self}. " @@ -68,6 +69,10 @@ def IMG_HEIGHT(self) -> int: # ================ Camera constants ================ # +MAX_FRAMES = 20000 # Rounded up from 10 minutes of data at 30 FPS +if SIMULATION: + MAX_FRAMES = 2000 + AVT_VENDOR_ID = 0x1AB2 AVT_PRODUCT_ID = 0x0001