diff --git a/README.md b/README.md index 7ee4c6f..5ab5dad 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ Otherwise a CLI interface will be provided. This behaviour can be disabled by supplying `interactive=False` to the connect call. -### Reading data +## Reading data Several utility methods are provided for working with OMERO.tables. These all support the full range of connection modes. @@ -113,7 +113,7 @@ my_dataframe.head() Returned dataframes also come with a pandas index column, representing the original row numbers from the OMERO.table. -### Writing data +## Writing data Pandas dataframes can also be written back as new OMERO.tables. N.b. It is currently not possible to modify a table on the server. @@ -121,22 +121,64 @@ N.b. It is currently not possible to modify a table on the server. Connection handling works just as it does with downloading, you can provide credentials, a token or a connection object. -To upload data, the user needs to specify which OMERO object the table -will be associated with. To do this, the third and fourth arguments -should be the object ID and object type. Supported objects are Dataset, +To upload data, the user needs to specify which OMERO object(s) the table +will be associated with. This can be achieved with the `parent_id` and +`parent_type` arguments. Supported objects are Dataset, Well, Plate, Project, Screen and Image. ```python import pandas import omero2pandas my_data = pandas.read_csv("/path/to/my_data.csv") -ann_id = omero2pandas.upload_table(my_data, "Name for table", 142, "Image") -# Returns the annotation ID of the uploaded file object +ann_id = omero2pandas.upload_table(my_data, "Name for table", + parent_id=142, parent_type="Image") +# Returns the annotation ID of the uploaded FileAnnotation object ``` Once uploaded, the table will be accessible on OMERO.web under the file annotations panel of the parent object. Using unique table names is advised. +### Linking to multiple objects + +To link to multiple objects, you can supply a list of `(, )` +tuples to the `links` parameter. The resulting table's FileAnnotation +will be linked to all objects in the `links` parameter (plus +`parent_type`:`parent_id` if provided). + + +```python +import omero2pandas +ann_id = omero2pandas.upload_table( + "/path/to/my.csv", "My table", + links=[("Image", 101), ("Dataset", 2), ("Roi", 1923)]) +# Uploads with Annotation links to Image 101, Dataset 2 and ROI 1923 +``` + +Links allow OMERO.web to display the resulting table as +an annotation associated with those objects. + + +### Large Tables +The first argument to `upload_table` can be a pandas dataframe or a path to a +.csv file containing the table data. In the latter case the table will be read +in chunks corresponding to the `chunk_size` argument. This will allow you to +upload tables which are too large to load into system memory. + +```python +import omero2pandas +ann_id = omero2pandas.upload_table("/path/to/my.csv", "My table", + 142, chunk_size=100) +# Reads and uploads the file to Image 142, loading 100 lines at a time +``` + +The `chunk_size` argument sets how many rows to send with each call to the server. +If not specified, omero2pandas will attempt to automatically optimise chunk +size to send ~2 million table cells per call (up to a max of 50,000 +rows per message for narrow tables). + + + + # Advanced Usage This package also contains utility functions for managing an OMERO connection. diff --git a/omero2pandas/__init__.py b/omero2pandas/__init__.py index dd8e687..e9bd95a 100644 --- a/omero2pandas/__init__.py +++ b/omero2pandas/__init__.py @@ -10,6 +10,7 @@ import logging import os import sys +from typing import Iterable import pandas import omero @@ -182,20 +183,23 @@ def read_table(file_id=None, annotation_id=None, column_names=(), rows=None, return df -def upload_table(dataframe, table_name, parent_id, parent_type='Image', - chunk_size=1000, omero_connector=None, server=None, - port=4064, username=None, password=None): +def upload_table(source, table_name, parent_id=None, parent_type='Image', + links=None, chunk_size=None, omero_connector=None, + server=None, port=4064, username=None, password=None): """ Upload a pandas dataframe to a new OMERO table. For the connection, supply either an active client object or server credentials (not both!). If neither are provided the program will search for an OMERO user token on the system. - :param dataframe: Pandas dataframe to upload to OMERO + :param source: Pandas dataframe or CSV file path to upload to OMERO :param table_name: Name for the table on OMERO :param parent_id: Object ID to attach the table to as an annotation. :param parent_type: Object type to attach to. - One of: Image, Dataset, Plate, Well - :param chunk_size: Rows to transmit to the server in a single operation + One of: Image, Dataset, Project, Well, Plate, Screen, Roi + :param links: List of (Type, ID) tuples specifying objects to + link the table to. + :param chunk_size: Rows to transmit to the server in a single operation. + Default: Automatically choose a size :param omero_connector: OMERO.client object which is already connected to a server. Supersedes any other connection details. :param server: Address of the server @@ -204,12 +208,28 @@ def upload_table(dataframe, table_name, parent_id, parent_type='Image', :param password: Password for server login :return: File Annotation ID of the new table """ + # Coerce inputs to the links list input format + links = links or [] + if (len(links) == 2 and + isinstance(links[0], str) and isinstance(links[1], int)): + # Someone forgot to nest their tuples, let's fix that + links = [links] + elif isinstance(links, tuple): + # Make sure it's mutable + links = list(links) + if parent_id is not None: + if (parent_type, parent_id) not in links: + links.append((parent_type, parent_id)) + if not links: + raise ValueError("No OMERO objects to link the table to") + elif not isinstance(links, Iterable): + raise ValueError(f"Links should be an iterable list of " + f"type/id pairs, not {type(links)}") with OMEROConnection(server=server, username=username, password=password, port=port, client=omero_connector) as connector: conn = connector.get_gateway() conn.SERVICE_OPTS.setOmeroGroup('-1') - ann_id = create_table(dataframe, table_name, parent_id, parent_type, - conn, chunk_size) + ann_id = create_table(source, table_name, links, conn, chunk_size) if ann_id is None: LOGGER.warning("Failed to create OMERO table") return ann_id @@ -349,7 +369,7 @@ def _get_table(conn, object_type, object_id): # Load the table resources = conn.c.sf.sharedResources() - data_table = resources.openTable(orig_file, _ctx=conn.SERVICE_OPTS) + data_table = resources.openTable(orig_file, conn.SERVICE_OPTS) conn.SERVICE_OPTS.setOmeroGroup(orig_group) return data_table diff --git a/omero2pandas/upload.py b/omero2pandas/upload.py index 19f4828..f392bf8 100644 --- a/omero2pandas/upload.py +++ b/omero2pandas/upload.py @@ -8,9 +8,11 @@ # support@glencoesoftware.com. import logging import math +import os import omero import omero.grid +import pandas as pd from tqdm.auto import tqdm LOGGER = logging.getLogger(__name__) @@ -42,6 +44,7 @@ "Project": omero.model.ProjectAnnotationLinkI, "Screen": omero.model.ScreenAnnotationLinkI, "Well": omero.model.WellAnnotationLinkI, + "Roi": omero.model.RoiAnnotationLinkI, } OBJECT_TYPES = { @@ -51,11 +54,22 @@ "Project": omero.model.ProjectI, "Screen": omero.model.ScreenI, "Well": omero.model.WellI, + "Roi": omero.model.RoiI, } +def optimal_chunk_size(column_count): + # We can optimally send ~2m values at a time + rows = 2000000 // column_count + if rows > 50000: + LOGGER.warning(f"Limiting automatic chunk size to 50000 (was {rows})") + return max(min(rows, 50000), 1) + + def generate_omero_columns(df): + # Inspect a pandas dataframe to generate OMERO.tables columns omero_columns = [] + string_columns = [] for column_name, column_type in df.dtypes.items(): cleaned_name = column_name.replace('/', '\\') if column_name in SPECIAL_NAMES and column_type.kind == 'i': @@ -66,76 +80,159 @@ def generate_omero_columns(df): raise NotImplementedError(f"Column type " f"{column_type} not supported") if col_class == omero.grid.StringColumn: + string_columns.append(column_name) max_len = df[column_name].str.len().max() if math.isnan(max_len): max_len = 1 col = col_class(cleaned_name, "", int(max_len), []) - # Coerce missing values into strings - df[column_name].fillna('', inplace=True) else: col = col_class(cleaned_name, "", []) omero_columns.append(col) - return omero_columns - - -def create_table(df, table_name, parent_id, parent_type, conn, chunk_size): - if parent_type not in OBJECT_TYPES: - raise NotImplementedError(f"Type {parent_type} not " - f"supported as a parent object") - parent_ob = conn.getObject(parent_type, parent_id) - if parent_ob is None: - raise ValueError(f"{parent_type} ID {parent_id} not found") - parent_group = parent_ob.details.group.id.val - - df = df.copy() - orig_columns = df.columns.tolist() - columns = generate_omero_columns(df) - resources = conn.c.sf.sharedResources(_ctx={ - "omero.group": str(parent_group)}) + return omero_columns, string_columns + + +def generate_omero_columns_csv(csv_path, chunk_size=1000): + # Inspect a CSV file to generate OMERO.tables columns + LOGGER.info(f"Inspecting {csv_path}") + scan = pd.read_csv(csv_path, nrows=chunk_size or 1000) + LOGGER.debug(f"Shape is {scan.shape[0]}x{scan.shape[1]}") + if chunk_size is None: + chunk_size = optimal_chunk_size(len(scan.columns)) + LOGGER.debug(f"Using chunk size {chunk_size}") + omero_columns = [] + to_resolve = {} + for idx, (column_name, column_type) in enumerate(scan.dtypes.items()): + cleaned_name = column_name.replace('/', '\\') + if column_name in SPECIAL_NAMES and column_type.kind == 'i': + col_class = SPECIAL_NAMES[column_name] + elif column_type.kind in COLUMN_TYPES: + col_class = COLUMN_TYPES[column_type.kind] + else: + raise NotImplementedError(f"Column type " + f"{column_type} not supported") + if col_class == omero.grid.StringColumn: + max_len = scan[column_name].str.len().max() + if math.isnan(max_len): + max_len = 1 + col = col_class(cleaned_name, "", int(max_len), []) + to_resolve[column_name] = idx + else: + col = col_class(cleaned_name, "", []) + omero_columns.append(col) + LOGGER.debug(f"Generated columns, found {len(to_resolve)} string columns") + # Use a subset of columns to get row count and string lengths + use_cols = to_resolve.keys() or [0] + row_count = 0 + LOGGER.info("Scanning CSV for size and column metadata") + for chunk in pd.read_csv(csv_path, chunksize=chunk_size, usecols=use_cols): + # chunk is a DataFrame. To "process" the rows in the chunk: + row_count += len(chunk) + for column_name, index in to_resolve.items(): + max_len = chunk[column_name].str.len().max() + if math.isnan(max_len): + max_len = 1 + max_len = int(max_len) + omero_columns[index].size = max(max_len, omero_columns[index].size) + LOGGER.info(f"Initial scan completed, found {row_count} rows") + return omero_columns, to_resolve.keys(), row_count, chunk_size + + +def create_table(source, table_name, links, conn, chunk_size): + # Create an OMERO.table and upload data + # Make type case-insensitive + links = [(t.lower().capitalize(), i) for t, i in links] + # Validate link list + working_group = None + roi_only = True + for target_type, target_id in links: + if target_type not in OBJECT_TYPES: + raise NotImplementedError(f"Type {target_type} not " + f"supported as a link target") + if target_type != "Roi": + roi_only = False + target_ob = conn.getObject(target_type, target_id) + if target_ob is None: + raise ValueError(f"{target_type} #{target_id} not found") + target_group = target_ob.details.group.id.val + if working_group is None: + working_group = target_group + else: + if working_group != target_group: + raise ValueError("All objects being linked to must belong to " + "the same OMERO group") + if roi_only: + LOGGER.warning("Only ROIs have been selected to link the table to. " + "Resulting table may not be shown in the OMERO.web UI.") + conn.SERVICE_OPTS.setOmeroGroup(working_group) + + progress_monitor = tqdm( + desc="Inspecting table...", initial=1, dynamic_ncols=True, + bar_format='{desc}: {percentage:3.0f}%|{bar}| ' + '{n_fmt}/{total_fmt} rows, {elapsed} {postfix}') + + if isinstance(source, str): + assert os.path.exists(source), f"Could not find file {source}" + columns, str_cols, total_rows, chunk_size = generate_omero_columns_csv( + source, chunk_size) + iter_data = (chunk for chunk in pd.read_csv( + source, chunksize=chunk_size)) + else: + source = source.copy() + columns, str_cols = generate_omero_columns(source) + total_rows = len(source) + if chunk_size is None: + chunk_size = optimal_chunk_size(len(columns)) + LOGGER.debug(f"Using chunk size {chunk_size}") + iter_data = (source.iloc[i:i + chunk_size] + for i in range(0, len(source), chunk_size)) + + resources = conn.c.sf.sharedResources({"omero.group": str(working_group)}) repository_id = resources.repositories().descriptions[0].getId().getValue() table = None try: - table = resources.newTable(repository_id, table_name, _ctx={ - "omero.group": str(parent_group)}) + table = resources.newTable(repository_id, table_name, + {"omero.group": str(working_group)}) table.initialize(columns) - total_to_upload = len(df) - slicer = range(0, total_to_upload, chunk_size) - - bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \ - '{n_fmt}/{total_fmt} rows, {elapsed} {postfix}' - - chunk_iter = tqdm(desc="Uploading table to OMERO", - total=total_to_upload, - bar_format=bar_fmt) - for start in slicer: - to_upload = df[start:start + chunk_size] - for idx, column in enumerate(columns): - ref_name = orig_columns[idx] - column.values = to_upload[ref_name].tolist() + progress_monitor.reset(total=total_rows) + progress_monitor.set_description("Uploading table to OMERO") + + for chunk in iter_data: + if str_cols: + # Coerce missing values into strings + chunk.loc[:, str_cols] = chunk.loc[:, str_cols].fillna('') + for omero_column, (name, col_data) in zip(columns, chunk.items()): + if omero_column.name != name: + LOGGER.debug(f"Matching {omero_column.name} -> {name}") + omero_column.values = col_data.tolist() table.addData(columns) - chunk_iter.update(len(to_upload)) - chunk_iter.close() + progress_monitor.update(len(chunk)) + progress_monitor.close() LOGGER.info("Table creation complete, linking to image") orig_file = table.getOriginalFile() - - # create file link - link_obj = LINK_TYPES[parent_type]() - target_obj = OBJECT_TYPES[parent_type](parent_id, False) - # create annotation + # Create FileAnnotation from OriginalFile annotation = omero.model.FileAnnotationI() - # link table to annotation object annotation.file = orig_file - - link_obj.link(target_obj, annotation) - link_obj = conn.getUpdateService().saveAndReturnObject( - link_obj, _ctx={"omero.group": str(parent_group)}) - LOGGER.info("Saved annotation link") - - LOGGER.info(f"Finished creating table {table_name} under " - f"{parent_type} {parent_id}") - return link_obj.child.id.val + annotation_obj = conn.getUpdateService().saveAndReturnObject( + annotation, {"omero.group": str(working_group)}) + LOGGER.info(f"Generated FileAnnotation {annotation_obj.id.val} " + f"with OriginalFile {orig_file.id.val}, " + f"linking to targets") + # Link the FileAnnotation to all targets + unloaded_annotation = omero.model.FileAnnotationI( + annotation_obj.id.val, False) + link_buffer = [] + for obj_type, obj_id in links: + link_obj = LINK_TYPES[obj_type]() + unloaded_target = OBJECT_TYPES[obj_type](obj_id, False) + link_obj.link(unloaded_target, unloaded_annotation) + link_buffer.append(link_obj) + # Transmit links to server + conn.getUpdateService().saveArray( + link_buffer, {"omero.group": str(working_group)}) + LOGGER.info(f"Finished creating table {table_name}") + return annotation_obj.id.val finally: if table is not None: table.close()