Skip to content

Commit

Permalink
Add MD to OME-Zarr 3D flow (#2)
Browse files Browse the repository at this point in the history
* Add flow to convert MD data to ome-zarr (3D).

---------

Co-authored-by: Jan Eglinger <[email protected]>
  • Loading branch information
tibuch and imagejan authored Mar 14, 2023
1 parent 77c2369 commit a60e817
Show file tree
Hide file tree
Showing 9 changed files with 454 additions and 4 deletions.
9 changes: 5 additions & 4 deletions environment.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
name: faim-hcs
name: prefect-faim-hcs
dependencies:
- python==3.9
- conda-forge::mobie_utils==0.4.2
- pip
- pip:
- prefect
- git+https://github.com/fmi-faim/[email protected]
- git+https://github.com/fmi-faim/[email protected]
- prefect==2.8.4
- git+https://github.com/fmi-faim/[email protected]
- git+https://github.com/fmi-faim/[email protected]
- git+https://github.com/fmi-faim/[email protected]
Empty file.
Empty file.
58 changes: 58 additions & 0 deletions src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Molecular Devices ImageXpress to OME-Zarr - 3D
Converts a 3D multi-channel multi-well plate acquisition into an OME-Zarr.

## Input Format
Standard Molecular Devices ImageXpress acquistions can be converted. Such an acquisition can contain z-projections, single z-planes and z-stacks.

## Flow Parameters
* `user`:
* `name`: Name of the user.
* `group`: Group name of the user.
* `run_name`: Name of processing run.
* `acquisition_dir`: Path to the MD ImageXpress acquisition directory.
* `ome_zarr`:
* `output_dir`: Path to where the OME-Zarr is written to.
* `order_name`: Name of the plate order.
* `barcode`: Plate barcode.
* `n_channels`: List of integers indicating the channels.
* `plate_layout`: Either 96-well-plate or 384-well-plate layout.
* `write_empty_chunks`: Set this to `False` if you have acquired single planes alongside full z-stacks.
* `mobie`:
* `project_folder`: MoBIE project folder.
* `dataset_name`: Name of this dataset.
* `description`: Description of the dataset.
* `parallelization`: How many wells are written in parallel. This number if optimized for our setup. __Do not change this.__

## Output Format
The output is an OME-Zarr which extends the [NGFF spec](https://ngff.openmicroscopy.org/latest/#hcs-layout).

All acquired fields of a well are montaged into a `CZYX` stack and saved in the zeroth field of the corresponding well. The respective projections are saved as `CYX` in the sub-group `projecitons` of the well-group.

### Metadata
Multiple metadata fields are added to the OME-Zarr `.zattrs` files.

`{plate_name}/.zattrs`:
* `barcode`: The barcode of the imaged plate
* `order_name`: Name of the plate order

`{plate_name}/{row}/{col}/0/.zattrs`:
* `acquisition_metadata`: A dictionary with key `channels`.
* `channels`: A list of dicitionaries for each acquired channel, with the following keys:
* `channel-name`: Name of the channel during acquisition
* `display-color`: RGB hex-code of the display color
* `exposure-time`
* `exposure-time-unit`
* `objective`: Objective description
* `objective-numerical-aperture`
* `power`: Illumination power used for this channel
* `shading-correction`: Set to `On` if a shading correction was applied automatically.
* `wavelength`: Name of the wavelength as provided by the microscope.
* `histograms`: A list of relative paths to the histograms of each channel.

## Packages
* [faim-hcs](https://github.com/fmi-faim/faim-hcs)
* [mobie-utils-python](https://github.com/mobie/mobie-utils-python)
* [custom-prefect-result](https://github.com/fmi-faim/custom-prefect-result)
* [faim-prefect](https://github.com/fmi-faim/faim-prefect)
* [prefect](https://github.com/PrefectHQ/prefect)
* [prefect-shell](https://github.com/PrefectHQ/prefect-shell)
181 changes: 181 additions & 0 deletions src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import json
from os import makedirs
from os.path import dirname, exists, join

from cpr.Serializer import cpr_serializer
from faim_hcs.Zarr import PlateLayout
from faim_prefect.block.choices import Choices
from faim_prefect.mamba import log_infrastructure
from faim_prefect.parallelization.utils import wait_for_task_run
from prefect import flow, get_run_logger
from prefect.filesystems import LocalFileSystem
from pydantic import BaseModel

from src.prefect_faim_hcs.tasks.io import get_file_list
from src.prefect_faim_hcs.tasks.mobie import add_mobie_dataset, create_mobie_project
from src.prefect_faim_hcs.tasks.zarr import (
add_well_to_plate_task,
build_zarr_scaffold_task,
)

groups = Choices.load("fmi-groups")


class User(BaseModel):
name: str
group: groups.get()
run_name: str


class OMEZarr(BaseModel):
output_dir: str
order_name: str
barcode: str
n_channels: list[int]
plate_layout: PlateLayout = PlateLayout.I384
write_empty_chunks: bool = True


class MoBIE(BaseModel):
project_folder: str
dataset_name: str
description: str


with open(
join("src/prefect_faim_hcs/flows/molecular_devices_to_ome_zarr_3d.md"),
encoding="UTF-8",
) as f:
description = f.read()


def validate_parameters(
user: User,
acquisition_dir: str,
ome_zarr: OMEZarr,
mobie: MoBIE,
parallelization: int,
):
logger = get_run_logger()
base_dir = LocalFileSystem.load("base-output-directory").basepath
group = user.group.value
if not exists(join(base_dir, group)):
logger.error(f"Group '{group}' does not exist in '{base_dir}'.")

if not exists(acquisition_dir):
logger.error(f"Acquisition directory '{acquisition_dir}' does not " f"exist.")

if not exists(ome_zarr.output_dir):
logger.error(f"Output directory '{ome_zarr.output_dir}' does not " f"exist.")

mobie_parent = dirname(mobie.project_folder.removesuffix("/"))
if not exists(mobie_parent):
logger.error(f"Output dir for MoBIE project does not exist: {mobie_parent}")

if parallelization < 1:
logger.error(f"parallelization = {parallelization}. Must be >= 1.")

run_dir = join(base_dir, group, user.name, "prefect-runs", user.run_name)

parameters = {
"user": {
"name": user.name,
"group": group,
"run_name": user.run_name,
},
"acquisition_dir": acquisition_dir,
"ome_zarr": ome_zarr.dict(),
"mobie": mobie.dict(),
"parallelization": parallelization,
}

makedirs(run_dir, exist_ok=True)
with open(join(run_dir, "parameters.json"), "w") as f:
f.write(json.dumps(parameters, indent=4))

return run_dir


@flow(
name="MolecularDevices to OME-Zarr [3D]",
description=description,
cache_result_in_memory=False,
persist_result=True,
result_serializer=cpr_serializer(),
result_storage=LocalFileSystem.load("prefect-faim-hcs"),
)
def molecular_devices_to_ome_zarr_3d(
user: User,
acquisition_dir: str,
ome_zarr: OMEZarr,
mobie: MoBIE,
parallelization: int = 24,
):
run_dir = validate_parameters(
user=user,
acquisition_dir=acquisition_dir,
ome_zarr=ome_zarr,
mobie=mobie,
parallelization=parallelization,
)

logger = get_run_logger()

logger.info(f"Run logs are written to: {run_dir}")
logger.info(f"OME-Zarr output-dir: {ome_zarr.output_dir}")
logger.info(f"MoBIE output-dir: {mobie.project_folder}")

files = get_file_list(acquisition_dir=acquisition_dir, run_dir=run_dir)

plate = build_zarr_scaffold_task(
root_dir=ome_zarr.output_dir,
files=files,
layout=ome_zarr.plate_layout,
order_name=ome_zarr.order_name,
barcode=ome_zarr.barcode,
)

buffer = []
wells = []
for well_id in files.get_data()["well"].unique():
buffer.append(
add_well_to_plate_task.submit(
zarr_source=plate,
files_proxy=files,
well=well_id,
channels=[f"w{i}" for i in ome_zarr.n_channels],
write_empty_chunks=ome_zarr.write_empty_chunks,
)
)

wait_for_task_run(
results=wells,
buffer=buffer,
max_buffer_length=parallelization,
result_insert_fn=lambda r: r.result(),
)

wait_for_task_run(
results=wells,
buffer=buffer,
max_buffer_length=0,
result_insert_fn=lambda r: r.result(),
)

create_mobie_project(project_folder=mobie.project_folder)

add_mobie_dataset(
project_folder=mobie.project_folder,
dataset_name=mobie.dataset_name,
description=mobie.description,
plate=plate,
is2d=False,
)

log_infrastructure(run_dir)

return plate, join(mobie.project_folder, mobie.dataset_name)


if __name__ == "__main__":
molecular_devices_to_ome_zarr_3d()
Empty file.
16 changes: 16 additions & 0 deletions src/prefect_faim_hcs/tasks/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from os.path import basename, join

from cpr.csv.CSVTarget import CSVTarget
from cpr.utilities.utilities import task_input_hash
from faim_hcs.io.MolecularDevicesImageXpress import parse_files
from prefect import task


@task(cache_key_fn=task_input_hash, refresh_cache=True)
def get_file_list(acquisition_dir: str, run_dir: str):
files = CSVTarget.from_path(
path=join(run_dir, basename(acquisition_dir) + "_files.csv")
)
df = parse_files(acquisition_dir=acquisition_dir)
files.set_data(df.sort_values(by=df.columns.values.tolist()))
return files
65 changes: 65 additions & 0 deletions src/prefect_faim_hcs/tasks/mobie.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from os.path import exists, join

import mobie.metadata as mom
from cpr.utilities.utilities import task_input_hash
from cpr.zarr.ZarrSource import ZarrSource
from faim_hcs.mobie import add_wells_to_project
from mobie.validation import validate_project
from prefect import get_run_logger, task


@task(cache_key_fn=task_input_hash)
def create_mobie_project(
project_folder: str,
):
logger = get_run_logger()
if exists(project_folder):
logger.info(f"MoBIE project at {project_folder} already exists.")
else:
mom.project_metadata.create_project_metadata(root=project_folder)
logger.info(f"Created new MoBIE project at {project_folder}.")


@task(cache_key_fn=task_input_hash)
def add_mobie_dataset(
project_folder: str,
dataset_name: str,
description: str,
plate: ZarrSource,
is2d: bool,
):
logger = get_run_logger()
mom.dataset_metadata.create_dataset_structure(
root=project_folder,
dataset_name=dataset_name,
file_formats=["ome.zarr"],
)
mom.dataset_metadata.create_dataset_metadata(
dataset_folder=join(project_folder, dataset_name),
description=description,
is2d=is2d,
)
mom.project_metadata.add_dataset(
root=project_folder,
dataset_name=dataset_name,
is_default=False,
)

add_wells_to_project(
plate=plate.get_data(),
dataset_folder=join(project_folder, dataset_name),
well_group="0",
view_name="default",
)

add_wells_to_project(
plate=plate.get_data(),
dataset_folder=join(project_folder, dataset_name),
well_group="0/projections",
view_name="Projections",
label_suffix="_projection",
)

validate_project(root=project_folder)

logger.info(f"Added {dataset_name} to MoBIE project {project_folder}.")
Loading

0 comments on commit a60e817

Please sign in to comment.