From a60e81760301dc662c0de2864bd087322f2e1a74 Mon Sep 17 00:00:00 2001 From: Tim-Oliver Buchholz Date: Tue, 14 Mar 2023 16:23:43 +0100 Subject: [PATCH] Add MD to OME-Zarr 3D flow (#2) * Add flow to convert MD data to ome-zarr (3D). --------- Co-authored-by: Jan Eglinger --- environment.yaml | 9 +- src/prefect_faim_hcs/__init__.py | 0 src/prefect_faim_hcs/flows/__init__.py | 0 .../flows/molecular_devices_to_ome_zarr_3d.md | 58 ++++++ .../flows/molecular_devices_to_ome_zarr_3d.py | 181 ++++++++++++++++++ src/prefect_faim_hcs/tasks/__init__.py | 0 src/prefect_faim_hcs/tasks/io.py | 16 ++ src/prefect_faim_hcs/tasks/mobie.py | 65 +++++++ src/prefect_faim_hcs/tasks/zarr.py | 129 +++++++++++++ 9 files changed, 454 insertions(+), 4 deletions(-) create mode 100644 src/prefect_faim_hcs/__init__.py create mode 100644 src/prefect_faim_hcs/flows/__init__.py create mode 100644 src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.md create mode 100644 src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.py create mode 100644 src/prefect_faim_hcs/tasks/__init__.py create mode 100644 src/prefect_faim_hcs/tasks/io.py create mode 100644 src/prefect_faim_hcs/tasks/mobie.py create mode 100644 src/prefect_faim_hcs/tasks/zarr.py diff --git a/environment.yaml b/environment.yaml index 06709c3..0dfc55b 100644 --- a/environment.yaml +++ b/environment.yaml @@ -1,9 +1,10 @@ -name: faim-hcs +name: prefect-faim-hcs dependencies: - python==3.9 - conda-forge::mobie_utils==0.4.2 - pip - pip: - - prefect - - git+https://github.com/fmi-faim/faim-hcs@v0.2.0 - - git+https://github.com/fmi-faim/custom-prefect-result@v0.1.4 + - prefect==2.8.4 + - git+https://github.com/fmi-faim/faim-hcs@v0.2.1 + - git+https://github.com/fmi-faim/custom-prefect-result@v0.1.5 + - git+https://github.com/fmi-faim/faim-prefect@v0.2.1 diff --git a/src/prefect_faim_hcs/__init__.py b/src/prefect_faim_hcs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/prefect_faim_hcs/flows/__init__.py b/src/prefect_faim_hcs/flows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.md b/src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.md new file mode 100644 index 0000000..a243662 --- /dev/null +++ b/src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.md @@ -0,0 +1,58 @@ +# Molecular Devices ImageXpress to OME-Zarr - 3D +Converts a 3D multi-channel multi-well plate acquisition into an OME-Zarr. + +## Input Format +Standard Molecular Devices ImageXpress acquistions can be converted. Such an acquisition can contain z-projections, single z-planes and z-stacks. + +## Flow Parameters +* `user`: + * `name`: Name of the user. + * `group`: Group name of the user. + * `run_name`: Name of processing run. +* `acquisition_dir`: Path to the MD ImageXpress acquisition directory. +* `ome_zarr`: + * `output_dir`: Path to where the OME-Zarr is written to. + * `order_name`: Name of the plate order. + * `barcode`: Plate barcode. + * `n_channels`: List of integers indicating the channels. + * `plate_layout`: Either 96-well-plate or 384-well-plate layout. + * `write_empty_chunks`: Set this to `False` if you have acquired single planes alongside full z-stacks. +* `mobie`: + * `project_folder`: MoBIE project folder. + * `dataset_name`: Name of this dataset. + * `description`: Description of the dataset. +* `parallelization`: How many wells are written in parallel. This number if optimized for our setup. __Do not change this.__ + +## Output Format +The output is an OME-Zarr which extends the [NGFF spec](https://ngff.openmicroscopy.org/latest/#hcs-layout). + +All acquired fields of a well are montaged into a `CZYX` stack and saved in the zeroth field of the corresponding well. The respective projections are saved as `CYX` in the sub-group `projecitons` of the well-group. + +### Metadata +Multiple metadata fields are added to the OME-Zarr `.zattrs` files. + +`{plate_name}/.zattrs`: +* `barcode`: The barcode of the imaged plate +* `order_name`: Name of the plate order + +`{plate_name}/{row}/{col}/0/.zattrs`: +* `acquisition_metadata`: A dictionary with key `channels`. + * `channels`: A list of dicitionaries for each acquired channel, with the following keys: + * `channel-name`: Name of the channel during acquisition + * `display-color`: RGB hex-code of the display color + * `exposure-time` + * `exposure-time-unit` + * `objective`: Objective description + * `objective-numerical-aperture` + * `power`: Illumination power used for this channel + * `shading-correction`: Set to `On` if a shading correction was applied automatically. + * `wavelength`: Name of the wavelength as provided by the microscope. +* `histograms`: A list of relative paths to the histograms of each channel. + +## Packages +* [faim-hcs](https://github.com/fmi-faim/faim-hcs) +* [mobie-utils-python](https://github.com/mobie/mobie-utils-python) +* [custom-prefect-result](https://github.com/fmi-faim/custom-prefect-result) +* [faim-prefect](https://github.com/fmi-faim/faim-prefect) +* [prefect](https://github.com/PrefectHQ/prefect) +* [prefect-shell](https://github.com/PrefectHQ/prefect-shell) diff --git a/src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.py b/src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.py new file mode 100644 index 0000000..c1c78f7 --- /dev/null +++ b/src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.py @@ -0,0 +1,181 @@ +import json +from os import makedirs +from os.path import dirname, exists, join + +from cpr.Serializer import cpr_serializer +from faim_hcs.Zarr import PlateLayout +from faim_prefect.block.choices import Choices +from faim_prefect.mamba import log_infrastructure +from faim_prefect.parallelization.utils import wait_for_task_run +from prefect import flow, get_run_logger +from prefect.filesystems import LocalFileSystem +from pydantic import BaseModel + +from src.prefect_faim_hcs.tasks.io import get_file_list +from src.prefect_faim_hcs.tasks.mobie import add_mobie_dataset, create_mobie_project +from src.prefect_faim_hcs.tasks.zarr import ( + add_well_to_plate_task, + build_zarr_scaffold_task, +) + +groups = Choices.load("fmi-groups") + + +class User(BaseModel): + name: str + group: groups.get() + run_name: str + + +class OMEZarr(BaseModel): + output_dir: str + order_name: str + barcode: str + n_channels: list[int] + plate_layout: PlateLayout = PlateLayout.I384 + write_empty_chunks: bool = True + + +class MoBIE(BaseModel): + project_folder: str + dataset_name: str + description: str + + +with open( + join("src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.md"), + encoding="UTF-8", +) as f: + description = f.read() + + +def validate_parameters( + user: User, + acquisition_dir: str, + ome_zarr: OMEZarr, + mobie: MoBIE, + parallelization: int, +): + logger = get_run_logger() + base_dir = LocalFileSystem.load("base-output-directory").basepath + group = user.group.value + if not exists(join(base_dir, group)): + logger.error(f"Group '{group}' does not exist in '{base_dir}'.") + + if not exists(acquisition_dir): + logger.error(f"Acquisition directory '{acquisition_dir}' does not " f"exist.") + + if not exists(ome_zarr.output_dir): + logger.error(f"Output directory '{ome_zarr.output_dir}' does not " f"exist.") + + mobie_parent = dirname(mobie.project_folder.removesuffix("/")) + if not exists(mobie_parent): + logger.error(f"Output dir for MoBIE project does not exist: {mobie_parent}") + + if parallelization < 1: + logger.error(f"parallelization = {parallelization}. Must be >= 1.") + + run_dir = join(base_dir, group, user.name, "prefect-runs", user.run_name) + + parameters = { + "user": { + "name": user.name, + "group": group, + "run_name": user.run_name, + }, + "acquisition_dir": acquisition_dir, + "ome_zarr": ome_zarr.dict(), + "mobie": mobie.dict(), + "parallelization": parallelization, + } + + makedirs(run_dir, exist_ok=True) + with open(join(run_dir, "parameters.json"), "w") as f: + f.write(json.dumps(parameters, indent=4)) + + return run_dir + + +@flow( + name="MolecularDevices to OME-Zarr [3D]", + description=description, + cache_result_in_memory=False, + persist_result=True, + result_serializer=cpr_serializer(), + result_storage=LocalFileSystem.load("prefect-faim-hcs"), +) +def molecular_devices_to_ome_zarr_3d( + user: User, + acquisition_dir: str, + ome_zarr: OMEZarr, + mobie: MoBIE, + parallelization: int = 24, +): + run_dir = validate_parameters( + user=user, + acquisition_dir=acquisition_dir, + ome_zarr=ome_zarr, + mobie=mobie, + parallelization=parallelization, + ) + + logger = get_run_logger() + + logger.info(f"Run logs are written to: {run_dir}") + logger.info(f"OME-Zarr output-dir: {ome_zarr.output_dir}") + logger.info(f"MoBIE output-dir: {mobie.project_folder}") + + files = get_file_list(acquisition_dir=acquisition_dir, run_dir=run_dir) + + plate = build_zarr_scaffold_task( + root_dir=ome_zarr.output_dir, + files=files, + layout=ome_zarr.plate_layout, + order_name=ome_zarr.order_name, + barcode=ome_zarr.barcode, + ) + + buffer = [] + wells = [] + for well_id in files.get_data()["well"].unique(): + buffer.append( + add_well_to_plate_task.submit( + zarr_source=plate, + files_proxy=files, + well=well_id, + channels=[f"w{i}" for i in ome_zarr.n_channels], + write_empty_chunks=ome_zarr.write_empty_chunks, + ) + ) + + wait_for_task_run( + results=wells, + buffer=buffer, + max_buffer_length=parallelization, + result_insert_fn=lambda r: r.result(), + ) + + wait_for_task_run( + results=wells, + buffer=buffer, + max_buffer_length=0, + result_insert_fn=lambda r: r.result(), + ) + + create_mobie_project(project_folder=mobie.project_folder) + + add_mobie_dataset( + project_folder=mobie.project_folder, + dataset_name=mobie.dataset_name, + description=mobie.description, + plate=plate, + is2d=False, + ) + + log_infrastructure(run_dir) + + return plate, join(mobie.project_folder, mobie.dataset_name) + + +if __name__ == "__main__": + molecular_devices_to_ome_zarr_3d() diff --git a/src/prefect_faim_hcs/tasks/__init__.py b/src/prefect_faim_hcs/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/prefect_faim_hcs/tasks/io.py b/src/prefect_faim_hcs/tasks/io.py new file mode 100644 index 0000000..8eb27e0 --- /dev/null +++ b/src/prefect_faim_hcs/tasks/io.py @@ -0,0 +1,16 @@ +from os.path import basename, join + +from cpr.csv.CSVTarget import CSVTarget +from cpr.utilities.utilities import task_input_hash +from faim_hcs.io.MolecularDevicesImageXpress import parse_files +from prefect import task + + +@task(cache_key_fn=task_input_hash, refresh_cache=True) +def get_file_list(acquisition_dir: str, run_dir: str): + files = CSVTarget.from_path( + path=join(run_dir, basename(acquisition_dir) + "_files.csv") + ) + df = parse_files(acquisition_dir=acquisition_dir) + files.set_data(df.sort_values(by=df.columns.values.tolist())) + return files diff --git a/src/prefect_faim_hcs/tasks/mobie.py b/src/prefect_faim_hcs/tasks/mobie.py new file mode 100644 index 0000000..c9d55d4 --- /dev/null +++ b/src/prefect_faim_hcs/tasks/mobie.py @@ -0,0 +1,65 @@ +from os.path import exists, join + +import mobie.metadata as mom +from cpr.utilities.utilities import task_input_hash +from cpr.zarr.ZarrSource import ZarrSource +from faim_hcs.mobie import add_wells_to_project +from mobie.validation import validate_project +from prefect import get_run_logger, task + + +@task(cache_key_fn=task_input_hash) +def create_mobie_project( + project_folder: str, +): + logger = get_run_logger() + if exists(project_folder): + logger.info(f"MoBIE project at {project_folder} already exists.") + else: + mom.project_metadata.create_project_metadata(root=project_folder) + logger.info(f"Created new MoBIE project at {project_folder}.") + + +@task(cache_key_fn=task_input_hash) +def add_mobie_dataset( + project_folder: str, + dataset_name: str, + description: str, + plate: ZarrSource, + is2d: bool, +): + logger = get_run_logger() + mom.dataset_metadata.create_dataset_structure( + root=project_folder, + dataset_name=dataset_name, + file_formats=["ome.zarr"], + ) + mom.dataset_metadata.create_dataset_metadata( + dataset_folder=join(project_folder, dataset_name), + description=description, + is2d=is2d, + ) + mom.project_metadata.add_dataset( + root=project_folder, + dataset_name=dataset_name, + is_default=False, + ) + + add_wells_to_project( + plate=plate.get_data(), + dataset_folder=join(project_folder, dataset_name), + well_group="0", + view_name="default", + ) + + add_wells_to_project( + plate=plate.get_data(), + dataset_folder=join(project_folder, dataset_name), + well_group="0/projections", + view_name="Projections", + label_suffix="_projection", + ) + + validate_project(root=project_folder) + + logger.info(f"Added {dataset_name} to MoBIE project {project_folder}.") diff --git a/src/prefect_faim_hcs/tasks/zarr.py b/src/prefect_faim_hcs/tasks/zarr.py new file mode 100644 index 0000000..085f116 --- /dev/null +++ b/src/prefect_faim_hcs/tasks/zarr.py @@ -0,0 +1,129 @@ +from cpr.csv.CSVTarget import CSVTarget +from cpr.utilities.utilities import task_input_hash +from cpr.zarr.ZarrSource import ZarrSource +from faim_hcs.MetaSeriesUtils import ( + get_well_image_CYX, + get_well_image_CZYX, + montage_grid_image_YX, +) +from faim_hcs.Zarr import ( + PlateLayout, + build_zarr_scaffold, + write_cyx_image_to_well, + write_czyx_image_to_well, +) +from pandas import DataFrame +from prefect import get_run_logger, task +from zarr import Group + + +def add_CYX_image_to_zarr_group( + group: Group, + files: DataFrame, + channels: list[str], + write_empty_chunks: bool = True, +): + image, ch_hists, ch_metadata, metadata = get_well_image_CYX( + well_files=files, + channels=channels, + assemble_fn=montage_grid_image_YX, + ) + write_cyx_image_to_well( + img=image, + histograms=ch_hists, + ch_metadata=ch_metadata, + general_metadata=metadata, + group=group, + write_empty_chunks=write_empty_chunks, + ) + + +def add_CZYX_image_to_zarr_group( + group: Group, + files: DataFrame, + channels: list[str], + write_empty_chunks: bool = True, +): + stack, ch_hists, ch_metadata, metadata = get_well_image_CZYX( + well_files=files, + channels=channels, + assemble_fn=montage_grid_image_YX, + ) + write_czyx_image_to_well( + img=stack, + histograms=ch_hists, + ch_metadata=ch_metadata, + general_metadata=metadata, + group=group, + write_empty_chunks=write_empty_chunks, + ) + + +@task(cache_key_fn=task_input_hash) +def build_zarr_scaffold_task( + root_dir: str, + files: CSVTarget, + layout: PlateLayout, + order_name: str, + barcode: str, +): + plate = build_zarr_scaffold( + root_dir=root_dir, + files=files.get_data(), + layout=layout, + order_name=order_name, + barcode=barcode, + ) + + return ZarrSource.from_path( + path=plate.store.path, + group="/", + mode="w", + ) + + +@task(cache_key_fn=task_input_hash) +def add_well_to_plate_task( + zarr_source: ZarrSource, + files_proxy: CSVTarget, + well: str, + channels: list[str], + write_empty_chunks: bool = True, +) -> ZarrSource: + files = files_proxy.get_data() + + logger = get_run_logger() + logger.info(f"Start processing well {well}.") + + plate = zarr_source.get_data() + row = well[0] + col = str(int(well[1:])) + field: Group = plate[row][col][0] + projections = field.create_group("projections") + + well_files = files[files["well"] == well] + + projection_files = well_files[well_files["z"].isnull()] + if len(projection_files) > 0: + add_CYX_image_to_zarr_group( + group=projections, + files=projection_files, + channels=channels, + write_empty_chunks=write_empty_chunks, + ) + + stack_files = well_files[~well_files["z"].isnull()] + if len(stack_files) > 0: + add_CZYX_image_to_zarr_group( + group=field, + files=stack_files, + channels=channels, + write_empty_chunks=write_empty_chunks, + ) + + zarr_well = ZarrSource.from_path( + path=zarr_source.get_path(), + group=f"{row}/{col}/0", + mode="r", + ) + return zarr_well