Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use real AWS S3 data tests and apply related fixes #212

Merged
merged 87 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
6cc2508
remove moto
d33bs Jun 12, 2024
48ceaec
further remove moto and related
d33bs Jun 13, 2024
77c446e
formatting
d33bs Jun 13, 2024
dd527ba
Merge remote-tracking branch 'upstream/main' into move-to-real-s3-tests
d33bs Jun 13, 2024
f0320d1
simplify sources work
d33bs Jun 13, 2024
a6038d6
simplify sources gathering; fix cloud paths
d33bs Jun 13, 2024
a678cb4
linting and pytest marking
d33bs Jun 14, 2024
2629325
fix utils path; correct pytest marker
d33bs Jun 14, 2024
ae97436
update test
d33bs Jun 16, 2024
0048ce6
update to add new preset for jump data
d33bs Jun 17, 2024
f066875
linting
d33bs Jun 17, 2024
c3dd6e1
add scheduled test specificity
d33bs Jun 17, 2024
dd37904
find shape
d33bs Jun 17, 2024
fe20934
tests for data shape; minor change for parsl conf
d33bs Jun 17, 2024
e788eb6
testing gh actions tests
d33bs Jun 17, 2024
5239324
custom testing for large data tests
d33bs Jun 18, 2024
712e4e1
docs updates; lower chunk size
d33bs Jun 18, 2024
ea0572c
smaller chunk size
d33bs Jun 18, 2024
704e628
updates for profiling
d33bs Jun 18, 2024
49f7262
avoid fail fast; increase test chunk size
d33bs Jun 19, 2024
d0d1ed0
remove explain
d33bs Jun 19, 2024
101c4a3
reduce chunk size
d33bs Jun 19, 2024
5061f51
attempt hte
d33bs Jun 19, 2024
0da51c2
join false for s3 sqlite
d33bs Jun 20, 2024
39ce4d4
no join with threaded
d33bs Jun 20, 2024
a6c17cd
split tests; duckdb mem limit
d33bs Jun 20, 2024
e667ced
update parsl
d33bs Jun 20, 2024
17a5deb
chunk size 2000
d33bs Jun 20, 2024
6e16782
chunk size 1000
d33bs Jun 20, 2024
3647bdb
8gb duckdb memory
d33bs Jun 20, 2024
2406e12
remove concat test
d33bs Jun 20, 2024
404bc05
800 chunk size
d33bs Jun 20, 2024
ee0a79c
remove duckdb mem limit; update ddb + pa
d33bs Jun 20, 2024
16481ad
try unsorted
d33bs Jun 20, 2024
d35bf62
avoid large data copies in joins
d33bs Jun 20, 2024
88b9557
sort output
d33bs Jun 20, 2024
2fc3a7b
chunk size 1000
d33bs Jun 20, 2024
8d74ffa
sort output and remove setting
d33bs Jun 21, 2024
c170b9c
chunk size 3000
d33bs Jun 21, 2024
cf1bafc
readd proper data changes
d33bs Jun 21, 2024
1f410dd
avoid column removal
d33bs Jun 21, 2024
10f5e5a
chunk size 5000; readd col removal
d33bs Jun 21, 2024
abf3b06
7000 chunk size
d33bs Jun 21, 2024
f60a930
10000 chunksize
d33bs Jun 21, 2024
0b82cd8
chunk size 15000
d33bs Jun 21, 2024
a07efe8
20000 chunksize
d33bs Jun 21, 2024
712dca2
chunksize 30000
d33bs Jun 21, 2024
edc3bfe
chunk size 45000
d33bs Jun 21, 2024
64bdf9e
chunksize 40000
d33bs Jun 21, 2024
3d47a3e
chunksize 35000
d33bs Jun 21, 2024
a36659f
chunk size 30000, scoping for test data
d33bs Jun 21, 2024
e597196
use tmp_path
d33bs Jun 21, 2024
3c486a6
chunk size 31000
d33bs Jun 21, 2024
94053e4
add test cytominer-database cmd
d33bs Jun 24, 2024
8c74ff7
poetry env context for cytominer-database
d33bs Jun 24, 2024
79c5732
show command
d33bs Jun 24, 2024
d958553
use static data for cytominer-database
d33bs Jun 25, 2024
ee3c415
provide unique paths for test
d33bs Jun 25, 2024
e8e0635
create temp dependency for testing
d33bs Jun 25, 2024
bbfee94
try threaded tests
d33bs Jun 25, 2024
7eb626f
attempt without temp dir
d33bs Jun 25, 2024
cefa9a7
readd temp dir
d33bs Jun 25, 2024
9424654
show sql
d33bs Jun 25, 2024
245db93
resolve path
d33bs Jun 25, 2024
e8dc54f
show sources
d33bs Jun 25, 2024
69c5b8a
move test files
d33bs Jun 25, 2024
8107dc6
show more about sources
d33bs Jun 25, 2024
10ac951
show sources without conditional
d33bs Jun 25, 2024
1a29b5b
add missing files
d33bs Jun 25, 2024
4eb50d6
remove debug; move tests
d33bs Jun 25, 2024
a985a37
fix ci conditional
d33bs Jun 25, 2024
7f6c055
lines cleanup; remove httpfs install
d33bs Jun 25, 2024
a64c5ba
testing comments
d33bs Jun 25, 2024
ada7701
revert to cloudpathlib ^0.18.0
d33bs Jun 25, 2024
3f760b0
update lockfile
d33bs Jun 25, 2024
11ef34b
prepare test ci for pr
d33bs Jun 25, 2024
dcefa59
split test jobs
d33bs Jun 25, 2024
f7ecf13
remove large data tests from matrix
d33bs Jun 25, 2024
d773992
linting
d33bs Jun 25, 2024
dbc3752
remove dev branch
d33bs Jun 25, 2024
80faae1
correct comments; ci job dep
d33bs Jun 25, 2024
509d29c
Merge remote-tracking branch 'upstream/main' into move-to-real-s3-tests
d33bs Jun 25, 2024
7fdbfab
avoid httpfs plugin error
d33bs Jun 25, 2024
f32f00c
comment about task event trigger
d33bs Jun 29, 2024
faddae4
comment about object storage path
d33bs Jun 29, 2024
8a20e28
limit join data for tests
d33bs Jul 2, 2024
b39c157
linting
d33bs Jul 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Python setup
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python_version }}
# remove poetry.lock file for scheduled tests
# to help simulate possible upstream issues
- name: Remove poetry.lock for scheduled tests
if: github.event_name == 'schedule'
# runs every Wednesday at 7 AM UTC
if: github.event.schedule == '0 7 * * 3'
d33bs marked this conversation as resolved.
Show resolved Hide resolved
run: |
rm poetry.lock
- name: Setup for poetry
Expand All @@ -40,4 +41,23 @@ jobs:
- name: Run sphinx-docs build test
run: poetry run sphinx-build docs/source doctest -W
- name: Run pytest
run: poetry run pytest
run: poetry run pytest -m "not large_data_tests"
# run large data tests as a separate job to help
# conserve resources by detecting failure with
# smaller tests first.
run_large_data_tests:
runs-on: ubuntu-22.04
needs: run_tests
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Python setup
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Setup for poetry
uses: ./.github/actions/setup-poetry
- name: Install environment
run: poetry install --no-interaction --no-ansi
- name: Run pytest for large data tests
run: poetry run pytest -m "large_data_tests"
29 changes: 11 additions & 18 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ def _get_table_columns_and_types(
import pathlib

import duckdb
from cloudpathlib import AnyPath

from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet

source_path = source["source_path"]
source_type = str(pathlib.Path(source_path).suffix).lower()
source_type = str(source_path.suffix).lower()

# prepare the data source in the form of a duckdb query
select_source = (
Expand Down Expand Up @@ -209,7 +210,7 @@ def _get_table_chunk_offsets(
import pathlib

import duckdb
from cloudpathlib import AnyPath
from cloudpathlib import AnyPath, CloudPath

from cytotable.exceptions import NoInputDataException
from cytotable.utils import _duckdb_reader
Expand All @@ -219,18 +220,9 @@ def _get_table_chunk_offsets(
if source is not None:
table_name = source["table_name"] if "table_name" in source.keys() else None
source_path = source["source_path"]
source_type = str(pathlib.Path(source_path).suffix).lower()
source_type = str(source_path.suffix).lower()

try:
# for csv's, check that we have more than one row (a header and data values)
if (
source_type == ".csv"
and sum(1 for _ in AnyPath(source_path).open("r")) <= 1
):
raise NoInputDataException(
f"Data file has 0 rows of values. Error in file: {source_path}"
)

# gather the total rowcount from csv or sqlite data input sources
with _duckdb_reader() as ddb_reader:
rowcount = int(
Expand Down Expand Up @@ -322,8 +314,8 @@ def _source_chunk_to_parquet(

# attempt to build dest_path
source_dest_path = (
f"{dest_path}/{str(pathlib.Path(source_group_name).stem).lower()}/"
f"{str(pathlib.Path(source['source_path']).parent.name).lower()}"
f"{dest_path}/{str(AnyPath(source_group_name).stem).lower()}/"
f"{str(source['source_path'].parent.name).lower()}"
)
pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -364,11 +356,11 @@ def _source_chunk_to_parquet(

# build output query and filepath base
# (chunked output will append offset to keep output paths unique)
if str(AnyPath(source["source_path"]).suffix).lower() == ".csv":
if str(source["source_path"].suffix).lower() == ".csv":
base_query = f"SELECT {select_columns} FROM read_csv_auto('{str(source['source_path'])}', header=TRUE, delim=',')"
result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}"

elif str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite":
elif str(source["source_path"].suffix).lower() == ".sqlite":
base_query = f"SELECT {select_columns} FROM sqlite_scan('{str(source['source_path'])}', '{str(source['table_name'])}')"
result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}.{source['table_name']}"

Expand Down Expand Up @@ -405,7 +397,7 @@ def _source_chunk_to_parquet(
# to handle the mixed types
if (
"Mismatch Type Error" in str(e)
and str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite"
and str(source["source_path"].suffix).lower() == ".sqlite"
):
_write_parquet_table_with_metadata(
# here we use sqlite instead of duckdb to extract
Expand Down Expand Up @@ -817,6 +809,7 @@ def _join_source_chunk(
exclude_meta_cols = [
f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys())
]

with _duckdb_reader() as ddb_reader:
result = ddb_reader.execute(
f"""
Expand Down Expand Up @@ -1114,7 +1107,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
else []
),
**kwargs,
).result()
)

# expand the destination path
expanded_dest_path = _expand_path(path=dest_path)
Expand Down
48 changes: 48 additions & 0 deletions cytotable/presets.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,54 @@
AND per_nuclei.Nuclei_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Nuclei
""",
},
"cellprofiler_sqlite_cpg0016_jump": {
# version specifications using related references
"CONFIG_SOURCE_VERSION": {
"cellprofiler": "v4.0.0",
},
# names of source table compartments (for ex. cells.csv, etc.)
"CONFIG_NAMES_COMPARTMENTS": ("cells", "nuclei", "cytoplasm"),
# names of source table metadata (for ex. image.csv, etc.)
"CONFIG_NAMES_METADATA": ("image",),
# column names in any compartment or metadata tables which contain
# unique names to avoid renaming
"CONFIG_IDENTIFYING_COLUMNS": (
"ImageNumber",
"ObjectNumber",
"Metadata_Well",
"Metadata_Plate",
"Parent_Cells",
"Parent_Nuclei",
),
# chunk size to use for join operations to help with possible performance issues
# note: this number is an estimate and is may need changes contingent on data
# and system used by this library.
"CONFIG_CHUNK_SIZE": 1000,
# compartment and metadata joins performed using DuckDB SQL
# and modified at runtime as needed
"CONFIG_JOINS": """
SELECT
image.Image_TableNumber,
image.Metadata_ImageNumber,
image.Metadata_Plate,
image.Metadata_Well,
image.Image_Metadata_Site,
image.Image_Metadata_Row,
cytoplasm.* EXCLUDE (Metadata_ImageNumber),
cells.* EXCLUDE (Metadata_ImageNumber),
nuclei.* EXCLUDE (Metadata_ImageNumber)
FROM
read_parquet('cytoplasm.parquet') AS cytoplasm
LEFT JOIN read_parquet('cells.parquet') AS cells ON
cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
AND cells.Metadata_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells
LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON
nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
AND nuclei.Metadata_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei
LEFT JOIN read_parquet('image.parquet') AS image ON
image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
""",
},
"cellprofiler_sqlite_pycytominer": {
# version specifications using related references
"CONFIG_SOURCE_VERSION": {
Expand Down
61 changes: 45 additions & 16 deletions cytotable/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
from typing import Any, Dict, List, Optional, Union

from cloudpathlib import AnyPath
from parsl.app.app import join_app, python_app

from cytotable.exceptions import NoInputDataException

@python_app
def _build_path(
path: Union[str, pathlib.Path, AnyPath], **kwargs
) -> Union[pathlib.Path, AnyPath]:

def _build_path(path: str, **kwargs) -> Union[pathlib.Path, AnyPath]:
"""
Build a path client or return local path.

Expand Down Expand Up @@ -43,10 +41,9 @@ def _build_path(
return processed_path


@python_app
def _get_source_filepaths(
path: Union[pathlib.Path, AnyPath],
targets: List[str],
targets: Optional[List[str]] = None,
source_datatype: Optional[str] = None,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Expand Down Expand Up @@ -75,7 +72,7 @@ def _get_source_filepaths(

if (targets is None or targets == []) and source_datatype is None:
raise DatatypeException(
f"A source_datatype must be specified when using undefined compartments and metadata names."
"A source_datatype must be specified when using undefined compartments and metadata names."
)

# gathers files from provided path using compartments + metadata as a filter
Expand All @@ -87,9 +84,9 @@ def _get_source_filepaths(
for subpath in (
(path,)
# used if the source path is a single file
if AnyPath(path).is_file()
if path.is_file()
# iterates through a source directory
else (x for x in AnyPath(path).glob("**/*") if AnyPath(x).is_file())
else (x for x in path.glob("**/*") if x.is_file())
)
# ensure the subpaths meet certain specifications
if (
Expand Down Expand Up @@ -129,7 +126,8 @@ def _get_source_filepaths(
.arrow()["table_name"]
.to_pylist()
# make sure the table names match with compartment + metadata names
if any(target.lower() in table_name.lower() for target in targets)
if targets is not None
and any(target.lower() in table_name.lower() for target in targets)
]
else:
# if we don't have sqlite source, append the existing element
Expand Down Expand Up @@ -181,7 +179,6 @@ def _get_source_filepaths(
return grouped_sources


@python_app
def _infer_source_datatype(
sources: Dict[str, List[Dict[str, Any]]], source_datatype: Optional[str] = None
) -> str:
Expand Down Expand Up @@ -230,7 +227,6 @@ def _infer_source_datatype(
return source_datatype


@python_app
def _filter_source_filepaths(
sources: Dict[str, List[Dict[str, Any]]], source_datatype: str
) -> Dict[str, List[Dict[str, Any]]]:
Expand Down Expand Up @@ -260,12 +256,45 @@ def _filter_source_filepaths(
if file["source_path"].stat().st_size > 0
# ensure the datatype matches the source datatype
and file["source_path"].suffix == f".{source_datatype}"
and _file_is_more_than_one_line(path=file["source_path"])
]
for filegroup, files in sources.items()
}


@join_app
def _file_is_more_than_one_line(path: Union[pathlib.Path, AnyPath]) -> bool:
"""
Check if the file has more than one line.

Args:
path (Union[pathlib.Path, AnyPath]):
The path to the file.

Returns:
bool:
True if the file has more than one line, False otherwise.

Raises:
NoInputDataException: If the file has zero lines.
"""

# if we don't have a sqlite file
# (we can't check sqlite files for lines)
if path.suffix.lower() != ".sqlite":
with path.open("r") as f:
try:
# read two lines, if the second is empty return false
return bool(f.readline() and f.readline())

except StopIteration:
# If we encounter the end of the file, it has only one line
raise NoInputDataException(
f"Data file has 0 rows of values. Error in file: {path}"
)
else:
return True


def _gather_sources(
source_path: str,
source_datatype: Optional[str] = None,
Expand Down Expand Up @@ -295,11 +324,11 @@ def _gather_sources(
_infer_source_datatype,
)

source_path = _build_path(path=source_path, **kwargs)
built_path = _build_path(path=source_path, **kwargs)

# gather filepaths which will be used as the basis for this work
sources = _get_source_filepaths(
path=source_path, targets=targets, source_datatype=source_datatype
path=built_path, targets=targets, source_datatype=source_datatype
)

# infer or validate the source datatype based on source filepaths
Expand Down
19 changes: 12 additions & 7 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ def _duckdb_reader() -> duckdb.DuckDBPyConnection:
INSTALL sqlite_scanner;
LOAD sqlite_scanner;

/* Install httpfs plugin to avoid error
https://github.com/duckdb/duckdb/issues/3243 */
INSTALL httpfs;

/*
Set threads available to duckdb
See the following for more information:
Expand Down Expand Up @@ -322,7 +326,7 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str:
return pa.Table.from_pylist(results)


def _cache_cloudpath_to_local(path: Union[str, AnyPath]) -> pathlib.Path:
def _cache_cloudpath_to_local(path: AnyPath) -> pathlib.Path:
"""
Takes a cloudpath and uses cache to convert to a local copy
for use in scenarios where remote work is not possible (sqlite).
Expand All @@ -337,24 +341,25 @@ def _cache_cloudpath_to_local(path: Union[str, AnyPath]) -> pathlib.Path:
A local pathlib.Path to cached version of cloudpath file.
"""

candidate_path = AnyPath(path)

# check that the path is a file (caching won't work with a dir)
# and check that the file is of sqlite type
# (other file types will be handled remotely in cloud)
if candidate_path.is_file() and candidate_path.suffix.lower() == ".sqlite":
if (
isinstance(path, CloudPath)
and path.is_file()
and path.suffix.lower() == ".sqlite"
):
try:
# update the path to be the local filepath for reference in CytoTable ops
# note: incurs a data read which will trigger caching of the file
path = CloudPath(path).fspath
path = pathlib.Path(path.fspath)
except InvalidPrefixError:
# share information about not finding a cloud path
logger.info(
"Did not detect a cloud path based on prefix. Defaulting to use local path operations."
)

# cast the result as a pathlib.Path
return pathlib.Path(path)
return path


def _arrow_type_cast_if_specified(
Expand Down
Loading
Loading