Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CSV inputs for uploader, link to multiple objects #19

Merged
merged 4 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Otherwise a CLI interface will be provided.

This behaviour can be disabled by supplying `interactive=False` to the connect call.

### Reading data
## Reading data

Several utility methods are provided for working with OMERO.tables. These all support the full range of connection modes.

Expand Down Expand Up @@ -113,7 +113,7 @@ my_dataframe.head()

Returned dataframes also come with a pandas index column, representing the original row numbers from the OMERO.table.

### Writing data
## Writing data

Pandas dataframes can also be written back as new OMERO.tables.
N.b. It is currently not possible to modify a table on the server.
Expand All @@ -137,6 +137,42 @@ ann_id = omero2pandas.upload_table(my_data, "Name for table", 142, "Image")
Once uploaded, the table will be accessible on OMERO.web under the file
annotations panel of the parent object. Using unique table names is advised.

### Large Tables
The first argument to `upload_table` can be a pandas dataframe or a path to a
.csv file containing the table data. In the latter case the table will be read
in chunks corresponding to the `chunk_size` argument. This will allow you to
upload tables which are too large to load into system memory.

```python
import omero2pandas
ann_id = omero2pandas.upload_table("/path/to/my.csv", "My table",
142, chunk_size=100)
DavidStirling marked this conversation as resolved.
Show resolved Hide resolved
# Reads and uploads the file to Image 142, loading 100 lines at a time
```


### Linking to additional objects

Each table gets linked to a parent object determined by the `parent_id` and
`parent_id` parameters. This becomes the main parent that the table is
DavidStirling marked this conversation as resolved.
Show resolved Hide resolved
associated with in OMERO. You can also supply the `extra_links` parameter to
provide extra objects to link the table to. This should be a list of
tuples in the format `(<target_type>, <target_id>)`.


```python
import omero2pandas
ann_id = omero2pandas.upload_table(
"/path/to/my.csv", "My table", 142,
extra_links=[("Image", 101), ("Dataset", 2), ("Roi", 1923)])
DavidStirling marked this conversation as resolved.
Show resolved Hide resolved
# Uploads with additional links to Image 101, Dataset 2 and ROI 1923
```

Extra links allow OMERO.web to display the resulting table as
an annotation associated with multiple objects.



# Advanced Usage

This package also contains utility functions for managing an OMERO connection.
Expand Down
19 changes: 11 additions & 8 deletions omero2pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,20 +182,23 @@ def read_table(file_id=None, annotation_id=None, column_names=(), rows=None,
return df


def upload_table(dataframe, table_name, parent_id, parent_type='Image',
chunk_size=1000, omero_connector=None, server=None,
port=4064, username=None, password=None):
def upload_table(source, table_name, parent_id, parent_type='Image',
extra_links=(), chunk_size=None, omero_connector=None,
server=None, port=4064, username=None, password=None):
"""
Upload a pandas dataframe to a new OMERO table.
For the connection, supply either an active client object or server
credentials (not both!). If neither are provided the program will search
for an OMERO user token on the system.
:param dataframe: Pandas dataframe to upload to OMERO
:param source: Pandas dataframe or CSV file path to upload to OMERO
:param table_name: Name for the table on OMERO
:param parent_id: Object ID to attach the table to as an annotation.
:param parent_type: Object type to attach to.
One of: Image, Dataset, Plate, Well
:param chunk_size: Rows to transmit to the server in a single operation
:param extra_links: List of (Type, ID) tuples specifying extra objects to
link the table to.
:param chunk_size: Rows to transmit to the server in a single operation.
Default: Automatically choose a size
:param omero_connector: OMERO.client object which is already connected
to a server. Supersedes any other connection details.
:param server: Address of the server
Expand All @@ -208,8 +211,8 @@ def upload_table(dataframe, table_name, parent_id, parent_type='Image',
port=port, client=omero_connector) as connector:
conn = connector.get_gateway()
conn.SERVICE_OPTS.setOmeroGroup('-1')
ann_id = create_table(dataframe, table_name, parent_id, parent_type,
conn, chunk_size)
ann_id = create_table(source, table_name, parent_id, parent_type,
conn, chunk_size, extra_links)
if ann_id is None:
LOGGER.warning("Failed to create OMERO table")
return ann_id
Expand Down Expand Up @@ -349,7 +352,7 @@ def _get_table(conn, object_type, object_id):

# Load the table
resources = conn.c.sf.sharedResources()
data_table = resources.openTable(orig_file, _ctx=conn.SERVICE_OPTS)
data_table = resources.openTable(orig_file, conn.SERVICE_OPTS)
conn.SERVICE_OPTS.setOmeroGroup(orig_group)
return data_table

Expand Down
177 changes: 145 additions & 32 deletions omero2pandas/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
# [email protected].
import logging
import math
import os
from typing import Iterable

import omero
import omero.grid
import pandas as pd
from tqdm.auto import tqdm

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -42,6 +45,7 @@
"Project": omero.model.ProjectAnnotationLinkI,
"Screen": omero.model.ScreenAnnotationLinkI,
"Well": omero.model.WellAnnotationLinkI,
"Roi": omero.model.RoiAnnotationLinkI,
}

OBJECT_TYPES = {
Expand All @@ -51,11 +55,22 @@
"Project": omero.model.ProjectI,
"Screen": omero.model.ScreenI,
"Well": omero.model.WellI,
"Roi": omero.model.RoiI,
}


def optimal_chunk_size(column_count):
# We can optimally send ~2m values at a time
rows = 2000000 // column_count
if rows > 50000:
LOGGER.warning(f"Limiting automatic chunk size to 50000 (was {rows})")
return max(min(rows, 50000), 1)


def generate_omero_columns(df):
# Inspect a pandas dataframe to generate OMERO.tables columns
omero_columns = []
string_columns = []
for column_name, column_type in df.dtypes.items():
cleaned_name = column_name.replace('/', '\\')
if column_name in SPECIAL_NAMES and column_type.kind == 'i':
Expand All @@ -66,56 +81,139 @@ def generate_omero_columns(df):
raise NotImplementedError(f"Column type "
f"{column_type} not supported")
if col_class == omero.grid.StringColumn:
string_columns.append(column_name)
max_len = df[column_name].str.len().max()
if math.isnan(max_len):
max_len = 1
col = col_class(cleaned_name, "", int(max_len), [])
# Coerce missing values into strings
df[column_name].fillna('', inplace=True)
else:
col = col_class(cleaned_name, "", [])
omero_columns.append(col)
return omero_columns
return omero_columns, string_columns


def generate_omero_columns_csv(csv_path, chunk_size=1000):
# Inspect a CSV file to generate OMERO.tables columns
LOGGER.info(f"Inspecting {csv_path}")
scan = pd.read_csv(csv_path, nrows=chunk_size or 1000)
LOGGER.debug(f"Shape is {scan.shape[0]}x{scan.shape[1]}")
if chunk_size is None:
chunk_size = optimal_chunk_size(len(scan.columns))
LOGGER.debug(f"Using chunk size {chunk_size}")
omero_columns = []
to_resolve = {}
for idx, (column_name, column_type) in enumerate(scan.dtypes.items()):
cleaned_name = column_name.replace('/', '\\')
if column_name in SPECIAL_NAMES and column_type.kind == 'i':
col_class = SPECIAL_NAMES[column_name]
elif column_type.kind in COLUMN_TYPES:
col_class = COLUMN_TYPES[column_type.kind]
else:
raise NotImplementedError(f"Column type "
f"{column_type} not supported")
if col_class == omero.grid.StringColumn:
Copy link
Member

@chris-allan chris-allan Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As below, import this? There are also features in omero.columns (which has the concrete implementations of the omero.grid column types) that you may find useful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a lot of classes from omero.grid and omero.model, so I'd just left it as a full import.

I wasn't aware that omero.columns existed, it's not mentioned in the main Python docs at all. Probably worth discussing whether there's any benefit to be had from using those instead of grid, and if so it could be best as a dedicated PR.

max_len = scan[column_name].str.len().max()
if math.isnan(max_len):
max_len = 1
col = col_class(cleaned_name, "", int(max_len), [])
to_resolve[column_name] = idx
else:
col = col_class(cleaned_name, "", [])
omero_columns.append(col)
LOGGER.debug(f"Generated columns, found {len(to_resolve)} string columns")
# Use a subset of columns to get row count and string lengths
use_cols = to_resolve.keys() or [0]
row_count = 0
LOGGER.info("Scanning CSV for size and column metadata")
for chunk in pd.read_csv(csv_path, chunksize=chunk_size, usecols=use_cols):
# chunk is a DataFrame. To "process" the rows in the chunk:
row_count += len(chunk)
for column_name, index in to_resolve.items():
max_len = chunk[column_name].str.len().max()
if math.isnan(max_len):
max_len = 1
max_len = int(max_len)
omero_columns[index].size = max(max_len, omero_columns[index].size)
LOGGER.info(f"Initial scan completed, found {row_count} rows")
return omero_columns, to_resolve.keys(), row_count, chunk_size


def create_table(df, table_name, parent_id, parent_type, conn, chunk_size):
def create_table(source, table_name, parent_id, parent_type, conn, chunk_size,
extra_links):
# Create an OMERO.table and upload data
# Make type case-insensitive
parent_type = parent_type.lower().capitalize()
if parent_type not in OBJECT_TYPES:
raise NotImplementedError(f"Type {parent_type} not "
f"supported as a parent object")
elif parent_type == "Roi":
LOGGER.warning("ROI selected as the primary attachment target, "
"resulting table may not be shown in OMERO.web UI.")
parent_ob = conn.getObject(parent_type, parent_id)
if parent_ob is None:
raise ValueError(f"{parent_type} ID {parent_id} not found")
parent_group = parent_ob.details.group.id.val

df = df.copy()
orig_columns = df.columns.tolist()
columns = generate_omero_columns(df)
resources = conn.c.sf.sharedResources(_ctx={
"omero.group": str(parent_group)})
if extra_links is not None and not isinstance(extra_links, Iterable):
raise ValueError(f"Extra Links should be an iterable list of "
f"type/id pairs, not {extra_links}")
link_to = []
for ob_type, ob_id in extra_links:
ob_type = ob_type.lower().capitalize()
if ob_type not in OBJECT_TYPES:
raise NotImplementedError(f"Type {ob_type} not "
f"supported as a link target")
if isinstance(ob_id, str):
assert ob_id.isdigit(), f"Object ID {ob_id} is not numeric"
ob_id = int(ob_id)
link_ob = conn.getObject(ob_type, ob_id)
DavidStirling marked this conversation as resolved.
Show resolved Hide resolved
if link_ob is None:
LOGGER.warning(f"{ob_type} ID {ob_id} not found, won't link")
continue
link_to.append((ob_type, ob_id))

progress_monitor = tqdm(
desc="Inspecting table...", initial=1, dynamic_ncols=True,
bar_format='{desc}: {percentage:3.0f}%|{bar}| '
'{n_fmt}/{total_fmt} rows, {elapsed} {postfix}')

if isinstance(source, str):
assert os.path.exists(source), f"Could not find file {source}"
columns, str_cols, total_rows, chunk_size = generate_omero_columns_csv(
source, chunk_size)
iter_data = (chunk for chunk in pd.read_csv(
source, chunksize=chunk_size))
else:
source = source.copy()
columns, str_cols = generate_omero_columns(source)
total_rows = len(source)
if chunk_size is None:
chunk_size = optimal_chunk_size(len(columns))
LOGGER.debug(f"Using chunk size {chunk_size}")
iter_data = (source.iloc[i:i + chunk_size]
for i in range(0, len(source), chunk_size))

resources = conn.c.sf.sharedResources({"omero.group": str(parent_group)})
repository_id = resources.repositories().descriptions[0].getId().getValue()

table = None
try:
table = resources.newTable(repository_id, table_name, _ctx={
"omero.group": str(parent_group)})
table = resources.newTable(repository_id, table_name,
{"omero.group": str(parent_group)})
table.initialize(columns)
total_to_upload = len(df)
slicer = range(0, total_to_upload, chunk_size)

bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \
'{n_fmt}/{total_fmt} rows, {elapsed} {postfix}'

chunk_iter = tqdm(desc="Uploading table to OMERO",
total=total_to_upload,
bar_format=bar_fmt)
for start in slicer:
to_upload = df[start:start + chunk_size]
for idx, column in enumerate(columns):
ref_name = orig_columns[idx]
column.values = to_upload[ref_name].tolist()
progress_monitor.reset(total=total_rows)
progress_monitor.set_description("Uploading table to OMERO")

for chunk in iter_data:
if str_cols:
# Coerce missing values into strings
chunk.loc[:, str_cols] = chunk.loc[:, str_cols].fillna('')
for omero_column, (name, col_data) in zip(columns, chunk.items()):
if omero_column.name != name:
LOGGER.debug(f"Matching {omero_column.name} -> {name}")
omero_column.values = col_data.tolist()
table.addData(columns)
chunk_iter.update(len(to_upload))
chunk_iter.close()
progress_monitor.update(len(chunk))
progress_monitor.close()

LOGGER.info("Table creation complete, linking to image")
orig_file = table.getOriginalFile()
Expand All @@ -130,12 +228,27 @@ def create_table(df, table_name, parent_id, parent_type, conn, chunk_size):

link_obj.link(target_obj, annotation)
link_obj = conn.getUpdateService().saveAndReturnObject(
link_obj, _ctx={"omero.group": str(parent_group)})
LOGGER.info("Saved annotation link")

link_obj, {"omero.group": str(parent_group)})
annotation_id = link_obj.child.id.val
LOGGER.info(f"Uploaded as FileAnnotation {annotation_id}")
extra_link_objs = []
unloaded_ann = omero.model.FileAnnotationI(annotation_id, False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we are not importing FileAnnotationI into the package scope?

for ob_type, ob_id in link_to:
# Construct additional links
link_obj = LINK_TYPES[ob_type]()
target_obj = OBJECT_TYPES[ob_type](ob_id, False)
link_obj.link(target_obj, unloaded_ann)
extra_link_objs.append(link_obj)
if extra_link_objs:
try:
conn.getUpdateService().saveArray(
DavidStirling marked this conversation as resolved.
Show resolved Hide resolved
extra_link_objs, {"omero.group": str(parent_group)})
LOGGER.info(f"Added links to {len(extra_link_objs)} objects")
except Exception:
LOGGER.error("Failed to create extra links", exc_info=True)
LOGGER.info(f"Finished creating table {table_name} under "
f"{parent_type} {parent_id}")
return link_obj.child.id.val
return annotation_id
finally:
if table is not None:
table.close()
Loading