From 0f25a4d0bbd25f1f0f9dd11d51d40e5f38bf63fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Wiewi=C3=B3rka?= Date: Sat, 21 Dec 2024 08:51:05 +0100 Subject: [PATCH] feat: Custom column names and suffixes for overlap and nearest operations (#43) * doc: Installation instructions * chore: Readme refactor * feat: Add support for custom column names and suffixes * Fixing needless borrow * Removing assertion and adding test case for non-default suffixes * Creating release 0.3.0 --- .github/workflows/publish_to_pypi.yml | 1 + .pre-commit-config.yaml | 11 ++ Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 63 +------ docs/notebooks/tutorial.ipynb | 146 ++++++++++++-- polars_bio/range_op.py | 34 ++-- polars_bio/range_op_helpers.py | 27 +-- polars_bio/range_op_io.py | 6 +- polars_bio/range_viz.py | 4 +- pyproject.toml | 2 +- src/lib.rs | 261 ++++++++++++++++++++------ tests/test_bioframe.py | 9 +- tests/test_native.py | 4 + tests/test_pandas.py | 4 + tests/test_polars.py | 26 ++- 16 files changed, 421 insertions(+), 181 deletions(-) diff --git a/.github/workflows/publish_to_pypi.yml b/.github/workflows/publish_to_pypi.yml index 29287e4..0437ce3 100644 --- a/.github/workflows/publish_to_pypi.yml +++ b/.github/workflows/publish_to_pypi.yml @@ -11,6 +11,7 @@ on: - 'docs/**' - 'benchmark/**' - 'mkdocs.yml' + - 'README.md' pull_request: workflow_dispatch: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6352607..3fabbaa 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -29,3 +29,14 @@ repos: hooks: - id: fmt - id: cargo-check + +### FIXME +# - repo: https://github.com/ddkasa/check-mkdocs.git +# rev: 65e819a4c62ee22c38f244b51b63f2f9b89a66d0 +# hooks: +# - id: check-mkdocs +# name: check-mkdocs +# args: ["--config", "mkdocs.yml"] # Optional, mkdocs.yml is the default +# # If you have additional plugins or libraries that are not included in +# # check-mkdocs, add them here +# additional_dependencies: ['mkdocs-material', 'mkdocs-jupyter', 'mkdocstrings-python'] \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index a23c351..760ef6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2734,7 +2734,7 @@ checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" [[package]] name = "polars_bio" -version = "0.2.11" +version = "0.2.12" dependencies = [ "arrow", "datafusion", diff --git a/Cargo.toml b/Cargo.toml index 37951f6..bbfbc7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "polars_bio" -version = "0.2.11" +version = "0.3.0" edition = "2021" [lib] diff --git a/README.md b/README.md index e0f1347..4f906e3 100644 --- a/README.md +++ b/README.md @@ -1,59 +1,10 @@ -# polars_bio +# polars-bio - Next-gen Python DataFrame operations for genomics! +![CI](https://github.com/biodatageeks/polars-bio/actions/workflows/publish_to_pypi.yml/badge.svg?branch=master) +![Docs](https://github.com/biodatageeks/polars-bio/actions/workflows/publish_documentation.yml/badge.svg?branch=master) +![logo](docs/assets/logo-large.png) -## Features +[polars-bio](https://pypi.org/project/polars-bio/) is a Python library for genomics built on top of [polars](https://pola.rs/), [Apache Arrow](https://arrow.apache.org/) and [Apache DataFusion](https://datafusion.apache.org/). +It provides a DataFrame API for genomics data and is designed to be blazing fast, memory efficient and easy to use. -## Genomic ranges operations - -| Features | Bioframe | polars-bio | PyRanges | Pybedtools | PyGenomics | GenomicRanges | -|--------------|--------------------|---------------------|--------------------|--------------------|--------------------|--------------------| -| overlap | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: | -| nearest | :white_check_mark: | :white_check_mark: | :white_check_mark: | | | | -| cluster | :white_check_mark: | | | | | | -| merge | :white_check_mark: | | | | | | -| complement | :white_check_mark: | | | | | | -| select/slice | :white_check_mark: | | | | | | -| | | | | | | | -| coverage | :white_check_mark: | | | | | | -| expand | :white_check_mark: | | | | | | -| sort | :white_check_mark: | | | | | | - - -## Input/Output -| I/O | Bioframe | polars-bio | PyRanges | Pybedtools | PyGenomics | GenomicRanges | -|------------------|--------------------|------------------------|--------------------|------------|------------|---------------| -| Pandas DataFrame | :white_check_mark: | :white_check_mark: | :white_check_mark: | | | | -| Polars DataFrame | | :white_check_mark: | | | | | -| Polars LazyFrame | | :white_check_mark: | | | | | -| Native readers | | :white_check_mark: | | | | | - - -## Genomic file format -| I/O | Bioframe | polars-bio | PyRanges | Pybedtools | PyGenomics | GenomicRanges | -|----------------|--------------------|------------|--------------------|------------|------------|---------------| -| BED | :white_check_mark: | | :white_check_mark: | | | | -| BAM | | | | | | | -| VCF | | | | | | | - - -## Performance -![img.png](benchmark/results-overlap-0.1.1.png) - -![img.png](benchmark/results-overlap-df-0.1.1.png) - -![img.png](benchmark/results-nearest-0.1.1.png) - -## Remarks - -Pyranges is multithreaded, but : - -* Requires Ray backend plus -```bash - nb_cpu: int, default 1 - - How many cpus to use. Can at most use 1 per chromosome or chromosome/strand tuple. - Will only lead to speedups on large datasets. -``` - -* for nearest returns no empty rows if there is no overlap (we follow Bioframe where nulls are returned) -# \ No newline at end of file +Read the [documentation](https://biodatageeks.github.io/polars-bio/) \ No newline at end of file diff --git a/docs/notebooks/tutorial.ipynb b/docs/notebooks/tutorial.ipynb index 1ad2000..f7c8335 100644 --- a/docs/notebooks/tutorial.ipynb +++ b/docs/notebooks/tutorial.ipynb @@ -9,14 +9,19 @@ { "cell_type": "code", "id": "7b173024d3e8f76", - "metadata": {}, + "metadata": { + "ExecuteTime": { + "end_time": "2024-12-20T19:12:04.948380Z", + "start_time": "2024-12-20T19:12:04.544324Z" + } + }, "source": [ "import polars_bio as pb\n", "import pandas as pd\n", "from polars_bio.range_viz import visualize_intervals" ], "outputs": [], - "execution_count": null + "execution_count": 1 }, { "cell_type": "markdown", @@ -27,19 +32,24 @@ { "cell_type": "code", "id": "86fe039c3780140e", - "metadata": {}, + "metadata": { + "ExecuteTime": { + "end_time": "2024-12-20T19:12:06.150356Z", + "start_time": "2024-12-20T19:12:06.145095Z" + } + }, "source": [ "df1 = pd.DataFrame(\n", " [[\"chr1\", 1, 5], [\"chr1\", 3, 8], [\"chr1\", 8, 10], [\"chr1\", 12, 14]],\n", - " columns=[\"contig\", \"pos_start\", \"pos_end\"],\n", + " columns=[\"chrom\", \"start\", \"end\"],\n", ")\n", "\n", "df2 = pd.DataFrame(\n", - " [[\"chr1\", 4, 8], [\"chr1\", 10, 11]], columns=[\"contig\", \"pos_start\", \"pos_end\"]\n", + " [[\"chr1\", 4, 8], [\"chr1\", 10, 11]], columns=[\"chrom\", \"start\", \"end\"]\n", ")" ], "outputs": [], - "execution_count": null + "execution_count": 2 }, { "cell_type": "markdown", @@ -50,32 +60,138 @@ { "cell_type": "code", "id": "304f3aa6fcdc9650", - "metadata": {}, + "metadata": { + "ExecuteTime": { + "end_time": "2024-12-20T19:12:08.303754Z", + "start_time": "2024-12-20T19:12:08.294952Z" + } + }, "source": [ "overlapping_intervals = pb.overlap(df1, df2, output_type=\"pandas.DataFrame\")" ], - "outputs": [], - "execution_count": null + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:polars_bio:Running Overlap operation with algorithm Coitrees and 1 thread(s)...\n" + ] + } + ], + "execution_count": 3 }, { "cell_type": "code", "id": "61c9254622598622", - "metadata": {}, + "metadata": { + "ExecuteTime": { + "end_time": "2024-12-20T19:12:12.727046Z", + "start_time": "2024-12-20T19:12:12.719803Z" + } + }, "source": [ "display(overlapping_intervals)" ], - "outputs": [], - "execution_count": null + "outputs": [ + { + "data": { + "text/plain": [ + " contig_1 pos_start_1 pos_end_1 contig_2 pos_start_2 pos_end_2\n", + "0 chr1 1 5 chr1 4 8\n", + "1 chr1 3 8 chr1 4 8" + ], + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
contig_1pos_start_1pos_end_1contig_2pos_start_2pos_end_2
0chr115chr148
1chr138chr148
\n", + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "execution_count": 4 }, { "cell_type": "code", "id": "e640901ec6e6ce11", - "metadata": {}, + "metadata": { + "ExecuteTime": { + "end_time": "2024-12-20T19:12:14.857698Z", + "start_time": "2024-12-20T19:12:14.791600Z" + } + }, "source": [ "visualize_intervals(overlapping_intervals)" ], - "outputs": [], - "execution_count": null + "outputs": [ + { + "data": { + "text/plain": [ + "
" + ], + "image/png": "" + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "
" + ], + "image/png": "" + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "execution_count": 5 }, { "cell_type": "markdown", diff --git a/polars_bio/range_op.py b/polars_bio/range_op.py index 1676957..6c14c53 100644 --- a/polars_bio/range_op.py +++ b/polars_bio/range_op.py @@ -10,7 +10,7 @@ pass from polars_bio.polars_bio import FilterOp, RangeOp, RangeOptions -DEFAULT_INTERVAL_COLUMNS = ["contig", "pos_start", "pos_end"] +DEFAULT_INTERVAL_COLUMNS = ["chrom", "start", "end"] ctx = Context().ctx @@ -20,7 +20,7 @@ def overlap( df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], how: str = "inner", overlap_filter: FilterOp = FilterOp.Strict, - suffixes: tuple[str] = ("_1", "_2"), + suffixes: tuple[str, str] = ("_1", "_2"), on_cols=None, col1: Union[list[str], None] = None, col2: Union[list[str], None] = None, @@ -85,19 +85,23 @@ def overlap( _validate_overlap_input(col1, col2, on_cols, suffixes, output_type, how) - col1 = ["contig", "pos_start", "pos_end"] if col1 is None else col1 - col2 = ["contig", "pos_start", "pos_end"] if col2 is None else col2 - range_options = RangeOptions(range_op=RangeOp.Overlap, filter_op=overlap_filter) - return range_operation( - df1, df2, suffixes, range_options, col1, col2, output_type, ctx + col1 = DEFAULT_INTERVAL_COLUMNS if col1 is None else col1 + col2 = DEFAULT_INTERVAL_COLUMNS if col2 is None else col2 + range_options = RangeOptions( + range_op=RangeOp.Overlap, + filter_op=overlap_filter, + suffixes=suffixes, + columns_1=col1, + columns_2=col2, ) + return range_operation(df1, df2, range_options, output_type, ctx) def nearest( df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], overlap_filter: FilterOp = FilterOp.Strict, - suffixes: (str, str) = ("_1", "_2"), + suffixes: tuple[str, str] = ("_1", "_2"), on_cols: Union[list[str], None] = None, col1: Union[list[str], None] = None, col2: Union[list[str], None] = None, @@ -136,9 +140,13 @@ def nearest( _validate_overlap_input(col1, col2, on_cols, suffixes, output_type, how="inner") - col1 = ["contig", "pos_start", "pos_end"] if col1 is None else col1 - col2 = ["contig", "pos_start", "pos_end"] if col2 is None else col2 - range_options = RangeOptions(range_op=RangeOp.Nearest, filter_op=overlap_filter) - return range_operation( - df1, df2, suffixes, range_options, col1, col2, output_type, ctx + col1 = DEFAULT_INTERVAL_COLUMNS if col1 is None else col1 + col2 = DEFAULT_INTERVAL_COLUMNS if col2 is None else col2 + range_options = RangeOptions( + range_op=RangeOp.Nearest, + filter_op=overlap_filter, + suffixes=suffixes, + columns_1=col1, + columns_2=col2, ) + return range_operation(df1, df2, range_options, output_type, ctx) diff --git a/polars_bio/range_op_helpers.py b/polars_bio/range_op_helpers.py index 519206e..caaa1cd 100644 --- a/polars_bio/range_op_helpers.py +++ b/polars_bio/range_op_helpers.py @@ -39,10 +39,7 @@ def __init__(self): def range_operation( df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], - suffixes: tuple[str, str], range_options: RangeOptions, - col1: list[str], - col2: list[str], output_type: str, ctx: BioSessionContext, ) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame]: @@ -56,8 +53,8 @@ def range_operation( ext2 == ".parquet" or ext2 == ".csv" ), "Dataframe1 must be a Parquet or CSV file" # use suffixes to avoid column name conflicts - df_schema1 = _get_schema(df1, suffixes[0]) - df_schema2 = _get_schema(df2, suffixes[1]) + df_schema1 = _get_schema(df1, range_options.suffixes[0]) + df_schema2 = _get_schema(df2, range_options.suffixes[1]) merged_schema = pl.Schema({**df_schema1, **df_schema2}) if output_type == "polars.LazyFrame": return range_lazy_scan( @@ -66,8 +63,6 @@ def range_operation( merged_schema, range_options=range_options, ctx=ctx, - col1=col1, - col2=col2, ) elif output_type == "polars.DataFrame": return range_operation_scan(ctx, df1, df2, range_options).to_polars() @@ -88,13 +83,11 @@ def range_operation( if output_type == "polars.LazyFrame": merged_schema = pl.Schema( { - **_rename_columns(df1, suffixes[0]).schema, - **_rename_columns(df2, suffixes[1]).schema, + **_rename_columns(df1, range_options.suffixes[0]).schema, + **_rename_columns(df2, range_options.suffixes[1]).schema, } ) - return range_lazy_scan( - df1, df2, merged_schema, col1, col2, range_options, ctx - ) + return range_lazy_scan(df1, df2, merged_schema, range_options, ctx) elif output_type == "polars.DataFrame": if isinstance(df1, pl.DataFrame) and isinstance(df2, pl.DataFrame): df1 = df1.to_arrow().to_reader() @@ -106,8 +99,8 @@ def range_operation( return range_operation_frame(ctx, df1, df2, range_options).to_polars() elif output_type == "pandas.DataFrame": if isinstance(df1, pd.DataFrame) and isinstance(df2, pd.DataFrame): - df1 = _df_to_arrow(df1, col1[0]).to_reader() - df2 = _df_to_arrow(df2, col2[0]).to_reader() + df1 = _df_to_arrow(df1, range_options.columns_1[0]).to_reader() + df2 = _df_to_arrow(df2, range_options.columns_2[0]).to_reader() else: raise ValueError( "Input and output dataframes must be of the same type: either polars or pandas" @@ -120,14 +113,8 @@ def range_operation( def _validate_overlap_input(col1, col2, on_cols, suffixes, output_type, how): - # TODO: Add support for col1 and col2 - assert col1 is None, "col1 is not supported yet" - assert col2 is None, "col2 is not supported yet" - # TODO: Add support for on_cols () assert on_cols is None, "on_cols is not supported yet" - - assert suffixes == ("_1", "_2"), "Only default suffixes are supported" assert output_type in [ "polars.LazyFrame", "polars.DataFrame", diff --git a/polars_bio/range_op_io.py b/polars_bio/range_op_io.py index 5b5c035..e9fecbf 100644 --- a/polars_bio/range_op_io.py +++ b/polars_bio/range_op_io.py @@ -20,8 +20,6 @@ def range_lazy_scan( df_1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], df_2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], schema: pl.Schema, - col1: list[str], - col2: list[str], range_options: RangeOptions, ctx: BioSessionContext, ) -> pl.LazyFrame: @@ -34,8 +32,8 @@ def range_lazy_scan( df_2 = df_2.to_arrow().to_reader() elif isinstance(df_1, pd.DataFrame) and isinstance(df_2, pd.DataFrame): range_function = range_operation_frame - df_1 = _df_to_arrow(df_1, col1[0]).to_reader() - df_2 = _df_to_arrow(df_2, col2[0]).to_reader() + df_1 = _df_to_arrow(df_1, range_options.columns_1[0]).to_reader() + df_2 = _df_to_arrow(df_2, range_options.columns_2[0]).to_reader() else: raise ValueError("Only polars and pandas dataframes are supported") diff --git a/polars_bio/range_viz.py b/polars_bio/range_viz.py index d08adb7..c146492 100644 --- a/polars_bio/range_viz.py +++ b/polars_bio/range_viz.py @@ -22,8 +22,8 @@ def visualize_intervals( df = df if isinstance(df, pd.DataFrame) else df.to_pandas() for i, reg_pair in df.iterrows(): bf.vis.plot_intervals_arr( - starts=[reg_pair.pos_start_1, reg_pair.pos_start_2], - ends=[reg_pair.pos_end_1, reg_pair.pos_end_2], + starts=[reg_pair.start_1, reg_pair.start_2], + ends=[reg_pair.end_1, reg_pair.end_2], colors=["skyblue", "lightpink"], levels=[2, 1], xlim=(0, 16), diff --git a/pyproject.toml b/pyproject.toml index 8378f16..e02e19a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "polars-bio" -version = "0.2.11" +version = "0.3.0" description = "Blazing fast genomic operations on large Python dataframes" authors = [] requires-python = ">=3.9" diff --git a/src/lib.rs b/src/lib.rs index 9950ce8..affe2e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::sync::Arc; use datafusion::arrow::array::RecordBatch; @@ -21,13 +22,20 @@ const RIGHT_TABLE: &str = "s2"; #[pyclass(name = "RangeOptions")] #[derive(Clone)] pub struct RangeOptions { - pub range_op: RangeOp, - pub filter_op: Option, - pub suffixes: Option>, - pub columns_1: Option>, - pub columns_2: Option>, - pub on_cols: Option>, - pub overlap_alg: Option, + #[pyo3(get, set)] + range_op: RangeOp, + #[pyo3(get, set)] + filter_op: Option, + #[pyo3(get, set)] + suffixes: Option<(String, String)>, + #[pyo3(get, set)] + columns_1: Option>, + #[pyo3(get, set)] + columns_2: Option>, + #[pyo3(get, set)] + on_cols: Option>, + #[pyo3(get, set)] + overlap_alg: Option, } #[pymethods] @@ -37,7 +45,7 @@ impl RangeOptions { pub fn new( range_op: RangeOp, filter_op: Option, - suffixes: Option>, + suffixes: Option<(String, String)>, columns_1: Option>, columns_2: Option>, on_cols: Option>, @@ -69,6 +77,19 @@ pub enum RangeOp { Complement = 1, Cluster = 2, Nearest = 3, + Coverage = 4, +} + +impl fmt::Display for RangeOp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RangeOp::Overlap => write!(f, "Overlap"), + RangeOp::Nearest => write!(f, "Nearest"), + RangeOp::Complement => write!(f, "Complement"), + RangeOp::Cluster => write!(f, "Cluster"), + RangeOp::Coverage => write!(f, "Coverage"), + } + } } pub enum InputFormat { @@ -169,85 +190,189 @@ async fn register_table(ctx: &SessionContext, path: &str, table_name: &str, form } } -async fn do_nearest(ctx: &SessionContext, filter: FilterOp) -> datafusion::dataframe::DataFrame { - info!( - "Running nearest: algorithm {} with {} thread(s)", - ctx.state() - .config() - .options() - .extensions - .get::() - .unwrap() - .interval_join_algorithm, - ctx.state().config().options().execution.target_partitions - ); - let sign = match filter { +async fn do_nearest( + ctx: &SessionContext, + range_opts: RangeOptions, +) -> datafusion::dataframe::DataFrame { + let sign = match range_opts.filter_op.unwrap() { FilterOp::Weak => "=".to_string(), _ => "".to_string(), }; + let suffixes = match range_opts.suffixes { + Some((s1, s2)) => (s1, s2), + _ => ("_1".to_string(), "_2".to_string()), + }; + let columns_1 = match range_opts.columns_1 { + Some(cols) => cols, + _ => vec![ + "contig".to_string(), + "pos_start".to_string(), + "pos_end".to_string(), + ], + }; + let columns_2 = match range_opts.columns_2 { + Some(cols) => cols, + _ => vec![ + "contig".to_string(), + "pos_start".to_string(), + "pos_end".to_string(), + ], + }; + let query = format!( r#" SELECT - a.contig AS contig_1, - a.pos_start AS pos_start_1, - a.pos_end AS pos_end_1, - b.contig AS contig_2, - b.pos_start AS pos_start_2, - b.pos_end AS pos_end_2, + a.{} AS {}{}, -- contig + a.{} AS {}{}, -- pos_start + a.{} AS {}{}, -- pos_end + b.{} AS {}{}, -- contig + b.{} AS {}{}, -- pos_start + b.{} AS {}{}, -- pos_end + a.* except({}, {}, {}), -- all join columns from left table + b.* except({}, {}, {}), -- all join columns from right table CAST( - CASE WHEN b.pos_start >= a.pos_end + CASE WHEN b.{} >= a.{} THEN - abs(b.pos_start-a.pos_end) - WHEN b.pos_end <= a.pos_start + abs(b.{}-a.{}) + WHEN b.{} <= a.{} THEN - abs(a.pos_start-b.pos_end) + abs(a.{}-b.{}) ELSE 0 END AS BIGINT) AS distance FROM {} AS b, {} AS a - WHERE b.contig = a.contig - AND cast(b.pos_end AS INT) >{} cast(a.pos_start AS INT ) - AND cast(b.pos_start AS INT) <{} cast(a.pos_end AS INT) + WHERE b.{} = a.{} + AND cast(b.{} AS INT) >{} cast(a.{} AS INT ) + AND cast(b.{} AS INT) <{} cast(a.{} AS INT) "#, - RIGHT_TABLE, LEFT_TABLE, sign, sign + columns_1[0], + columns_1[0], + suffixes.0, // contig + columns_1[1], + columns_1[1], + suffixes.0, // pos_start + columns_1[2], + columns_1[2], + suffixes.0, // pos_end + columns_2[0], + columns_2[0], + suffixes.1, // contig + columns_2[1], + columns_2[1], + suffixes.1, // pos_start + columns_2[2], + columns_2[2], + suffixes.1, // pos_end + columns_1[0], + columns_1[1], + columns_1[2], // all join columns from right table + columns_2[0], + columns_2[1], + columns_2[2], // all join columns from left table + columns_2[1], + columns_1[2], // b.pos_start >= a.pos_end + columns_2[1], + columns_1[2], // b.pos_start-a.pos_end + columns_2[2], + columns_1[1], // b.pos_end <= a.pos_start + columns_2[2], + columns_1[1], // a.pos_start-b.pos_end + RIGHT_TABLE, + LEFT_TABLE, + columns_1[0], + columns_2[0], // contig + columns_1[2], + sign, + columns_2[1], // pos_start + columns_1[1], + sign, + columns_2[2], // pos_end ); ctx.sql(&query).await.unwrap() } -async fn do_overlap(ctx: &SessionContext, filter: FilterOp) -> datafusion::dataframe::DataFrame { - let sign = match filter { + +async fn do_overlap( + ctx: &SessionContext, + range_opts: RangeOptions, +) -> datafusion::dataframe::DataFrame { + let sign = match range_opts.clone().filter_op.unwrap() { FilterOp::Weak => "=".to_string(), _ => "".to_string(), }; - info!( - "Running overlap: algorithm {} with {} thread(s)", - ctx.state() - .config() - .options() - .extensions - .get::() - .unwrap() - .interval_join_algorithm, - ctx.state().config().options().execution.target_partitions - ); + let suffixes = match range_opts.suffixes { + Some((s1, s2)) => (s1, s2), + _ => ("_1".to_string(), "_2".to_string()), + }; + let columns_1 = match range_opts.columns_1 { + Some(cols) => cols, + _ => vec![ + "contig".to_string(), + "pos_start".to_string(), + "pos_end".to_string(), + ], + }; + let columns_2 = match range_opts.columns_2 { + Some(cols) => cols, + _ => vec![ + "contig".to_string(), + "pos_start".to_string(), + "pos_end".to_string(), + ], + }; let query = format!( r#" SELECT - a.contig as contig_1, - a.pos_start as pos_start_1, - a.pos_end as pos_end_1, - b.contig as contig_2, - b.pos_start as pos_start_2, - b.pos_end as pos_end_2 + a.{} as {}{}, -- contig + a.{} as {}{}, -- pos_start + a.{} as {}{}, -- pos_end + b.{} as {}{}, -- contig + b.{} as {}{}, -- pos_start + b.{} as {}{}, -- pos_end + a.* except({}, {}, {}), -- all join columns from left table + b.* except({}, {}, {}) -- all join columns from right table FROM {} a, {} b WHERE - a.contig=b.contig + a.{}=b.{} AND - cast(a.pos_end AS INT) >{} cast(b.pos_start AS INT) + cast(a.{} AS INT) >{} cast(b.{} AS INT) AND - cast(a.pos_start AS INT) <{} cast(b.pos_end AS INT) + cast(a.{} AS INT) <{} cast(b.{} AS INT) "#, - LEFT_TABLE, RIGHT_TABLE, sign, sign, + columns_1[0], + columns_1[0], + suffixes.0, // contig + columns_1[1], + columns_1[1], + suffixes.0, // pos_start + columns_1[2], + columns_1[2], + suffixes.0, // pos_end + columns_2[0], + columns_2[0], + suffixes.1, // contig + columns_2[1], + columns_2[1], + suffixes.1, // pos_start + columns_2[2], + columns_2[2], + suffixes.1, // pos_end + columns_1[0], + columns_1[1], + columns_1[2], // all join columns from right table + columns_2[0], + columns_2[1], + columns_2[2], // all join columns from left table + LEFT_TABLE, + RIGHT_TABLE, + columns_1[0], + columns_2[0], // contig + columns_1[2], + sign, + columns_2[1], // pos_start + columns_1[1], + sign, + columns_2[2], // pos_end ); ctx.sql(&query).await.unwrap() } @@ -306,12 +431,12 @@ fn do_range_operation( range_options: RangeOptions, ) -> datafusion::dataframe::DataFrame { // defaults - match range_options.overlap_alg { + match &range_options.overlap_alg { Some(alg) if alg == "coitreesnearest" => { panic!("CoitreesNearest is an internal algorithm for nearest operation. Can't be set explicitly."); }, Some(alg) => { - set_option_internal(ctx, "sequila.interval_join_algorithm", &alg); + set_option_internal(ctx, "sequila.interval_join_algorithm", alg); }, _ => { set_option_internal( @@ -321,11 +446,23 @@ fn do_range_operation( ); }, } + info!( + "Running {} operation with algorithm {} and {} thread(s)...", + range_options.range_op, + ctx.state() + .config() + .options() + .extensions + .get::() + .unwrap() + .interval_join_algorithm, + ctx.state().config().options().execution.target_partitions + ); match range_options.range_op { - RangeOp::Overlap => rt.block_on(do_overlap(ctx, range_options.filter_op.unwrap())), + RangeOp::Overlap => rt.block_on(do_overlap(ctx, range_options)), RangeOp::Nearest => { set_option_internal(ctx, "sequila.interval_join_algorithm", "coitreesnearest"); - rt.block_on(do_nearest(ctx, range_options.filter_op.unwrap())) + rt.block_on(do_nearest(ctx, range_options)) }, _ => panic!("Unsupported operation"), } diff --git a/tests/test_bioframe.py b/tests/test_bioframe.py index 752f97d..41c248a 100644 --- a/tests/test_bioframe.py +++ b/tests/test_bioframe.py @@ -6,11 +6,14 @@ from polars_bio.polars_bio import FilterOp -class TestOverlapBioframe: +class TestBioframe: result_overlap = pb.overlap( BIO_PD_DF1, BIO_PD_DF2, + col1=("contig", "pos_start", "pos_end"), + col2=("contig", "pos_start", "pos_end"), output_type="pandas.DataFrame", + suffixes=("_1", "_3"), overlap_filter=FilterOp.Strict, ) result_bio_overlap = bf.overlap( @@ -18,13 +21,15 @@ class TestOverlapBioframe: BIO_PD_DF2, cols1=("contig", "pos_start", "pos_end"), cols2=("contig", "pos_start", "pos_end"), - suffixes=("_1", "_2"), + suffixes=("_1", "_3"), how="inner", ) resust_nearest = pb.nearest( BIO_PD_DF1, BIO_PD_DF2, + col1=("contig", "pos_start", "pos_end"), + col2=("contig", "pos_start", "pos_end"), overlap_filter=FilterOp.Strict, output_type="pandas.DataFrame", ) diff --git a/tests/test_native.py b/tests/test_native.py index 82d20f5..b053545 100644 --- a/tests/test_native.py +++ b/tests/test_native.py @@ -16,6 +16,8 @@ class TestOverlapNative: result_csv = pb.overlap( DF_OVER_PATH1, DF_OVER_PATH2, + col1=("contig", "pos_start", "pos_end"), + col2=("contig", "pos_start", "pos_end"), output_type="pandas.DataFrame", overlap_filter=FilterOp.Weak, ) @@ -35,6 +37,8 @@ class TestNearestNative: result = pb.nearest( DF_NEAREST_PATH1, DF_NEAREST_PATH2, + col1=("contig", "pos_start", "pos_end"), + col2=("contig", "pos_start", "pos_end"), output_type="pandas.DataFrame", overlap_filter=FilterOp.Weak, ) diff --git a/tests/test_pandas.py b/tests/test_pandas.py index 71695a9..f5629d1 100644 --- a/tests/test_pandas.py +++ b/tests/test_pandas.py @@ -16,6 +16,8 @@ class TestOverlapPandas: result = pb.overlap( PD_OVERLAP_DF1, PD_OVERLAP_DF2, + col1=("contig", "pos_start", "pos_end"), + col2=("contig", "pos_start", "pos_end"), output_type="pandas.DataFrame", overlap_filter=FilterOp.Weak, ) @@ -35,6 +37,8 @@ class TestNearestPandas: result = pb.nearest( PD_NEAREST_DF1, PD_NEAREST_DF2, + col1=("contig", "pos_start", "pos_end"), + col2=("contig", "pos_start", "pos_end"), output_type="pandas.DataFrame", overlap_filter=FilterOp.Weak, ) diff --git a/tests/test_polars.py b/tests/test_polars.py index a559c4c..e3b0fd3 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -13,10 +13,20 @@ class TestOverlapPolars: result_frame = pb.overlap( - PL_DF1, PL_DF2, output_type="polars.DataFrame", overlap_filter=FilterOp.Weak + PL_DF1, + PL_DF2, + output_type="polars.DataFrame", + overlap_filter=FilterOp.Weak, + col1=("contig", "pos_start", "pos_end"), + col2=("contig", "pos_start", "pos_end"), ) result_lazy = pb.overlap( - PL_DF1, PL_DF2, output_type="polars.LazyFrame", overlap_filter=FilterOp.Weak + PL_DF1, + PL_DF2, + output_type="polars.LazyFrame", + overlap_filter=FilterOp.Weak, + col1=("contig", "pos_start", "pos_end"), + col2=("contig", "pos_start", "pos_end"), ).collect() expected = PL_DF_OVERLAP @@ -35,10 +45,18 @@ def test_overlap_schema_rows_lazy(self): class TestNearestPolars: result_frame = pb.nearest( - PL_NEAREST_DF1, PL_NEAREST_DF2, output_type="polars.DataFrame" + PL_NEAREST_DF1, + PL_NEAREST_DF2, + output_type="polars.DataFrame", + col1=("contig", "pos_start", "pos_end"), + col2=("contig", "pos_start", "pos_end"), ) result_lazy = pb.nearest( - PL_NEAREST_DF1, PL_NEAREST_DF2, output_type="polars.LazyFrame" + PL_NEAREST_DF1, + PL_NEAREST_DF2, + output_type="polars.LazyFrame", + col1=("contig", "pos_start", "pos_end"), + col2=("contig", "pos_start", "pos_end"), ).collect() expected = PL_DF_NEAREST