Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert to using pandas based batched query, removing dask as a depend… #4

Merged
merged 3 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ cyclops-query
[![codecov](https://codecov.io/gh/VectorInstitute/cyclops-query/branch/main/graph/badge.svg)](https://codecov.io/gh/VectorInstitute/cyclops-query)
[![license](https://img.shields.io/github/license/VectorInstitute/cyclops-query.svg)](https://github.com/VectorInstitute/cyclops-query/blob/main/LICENSE)

``cyclops-query`` is a tool for querying EHR databases.
``cyclops-query`` is a tool for querying relational databases using a simple Python API. It is specifically developed to query
Electronic Health Record (EHR) databases. The tool is a wrapper around [SQLAlchemy](https://www.sqlalchemy.org/) and can be used
to write SQL-like queries in Python, including joins, conditions, groupby aggregation and many more.

## 🐣 Getting Started

Expand All @@ -18,6 +20,42 @@ cyclops-query
python3 -m pip install cycquery
```

### Query postgresql database

```python
from cycquery import DatasetQuerier
import cycquery.ops as qo


querier = DatasetQuerier(
dbms="postgresql",
port=5432,
host="localhost",
database="dbname",
user="usename",
password="password",
)
# List all tables.
querier.list_tables()

# Get some table.
table = querier.schema.sometable()
# Filter based on some condition (e.g. substring match).
table = table.ops(qo.ConditionSubstring("col1", "substr"))
# Run query to get data as a pandas dataframe.
df = table.run()

# Create a sequential list of operations to perform on the query.
ops = qo.Sequential(
qo.ConditionIn("col2", [1, 2]),
qo.DropNulls("col3"),
qo.Distinct("col1")
)
table = table.ops(ops)
# Run query to get data as a pandas dataframe.
df = table.run()
```

## 🧑🏿‍💻 Developing

### Using poetry
Expand Down
45 changes: 27 additions & 18 deletions cycquery/interface.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""A query interface class to wrap database objects and queries."""

import logging
from typing import List, Literal, Optional, Tuple, Union
from typing import Generator, List, Literal, Optional, Tuple, Union

import dask.dataframe as dd
import pandas as pd
from sqlalchemy.sql.elements import BinaryExpression

Expand Down Expand Up @@ -46,7 +45,7 @@
self._data = None

@property
def data(self) -> Optional[Union[pd.DataFrame, dd.core.DataFrame]]:
def data(self) -> Union[pd.DataFrame, None]:
"""Get data."""
return self._data

Expand Down Expand Up @@ -176,10 +175,10 @@
def run(
self,
limit: Optional[int] = None,
backend: Literal["pandas", "dask", "datasets"] = "pandas",
index_col: Optional[str] = None,
n_partitions: Optional[int] = None,
) -> Union[pd.DataFrame, dd.core.DataFrame]:
batch_mode: bool = False,
batch_size: int = 1000000,
) -> Union[pd.DataFrame, Generator[pd.DataFrame, None, None]]:
"""Run the query, and fetch data.

Parameters
Expand All @@ -191,22 +190,29 @@
index_col
Column which becomes the index, and defines the partitioning.
Should be a indexed column in the SQL server, and any orderable type.
n_partitions
Number of partitions. Check dask documentation for additional details.
batch_mode
Whether to run the query in batch mode. A generator is returned if True.
batch_size
Batch size for the query, default 1 million rows.

Returns
-------
pandas.DataFrame or dask.DataFrame or datasets.Dataset
pandas.DataFrame or Generator[pandas.DataFrame, None, None]
Query result.

"""
self._data = self.database.run_query(
self.query,
limit=limit,
backend=backend,
index_col=index_col,
n_partitions=n_partitions,
)
if not batch_mode:
self._data = self.database.run_query(
self.query,
limit=limit,
index_col=index_col,
)
else:
self._data = self.database.run_query_batch(
self.query,
index_col=index_col,
batch_size=batch_size,
)

return self._data

Expand All @@ -232,8 +238,11 @@
"""
# If the query was already run.
if self._data is not None:
return save_dataframe(self._data, path, file_format=file_format)

if isinstance(self._data, pd.DataFrame):
return save_dataframe(self._data, path, file_format=file_format)
if isinstance(self._data, Generator):
for i, df in enumerate(self._data):
save_dataframe(df, f"{path}/batch-{i:03d}", file_format=file_format)

Check warning on line 245 in cycquery/interface.py

View check run for this annotation

Codecov / codecov/patch

cycquery/interface.py#L243-L245

Added lines #L243 - L245 were not covered by tests
# Save without running.
if file_format == "csv":
path = self.database.save_query_to_csv(self.query, path)
Expand Down
40 changes: 40 additions & 0 deletions cycquery/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -3260,3 +3260,43 @@
table = _process_checks(table, cols=cols)

return select(table).distinct(*get_columns(table, cols)).subquery()


class Count(QueryOp):
"""Count the number of rows.

Parameters
----------
col
Column to count.

Examples
--------
>>> Count("person_id")(table)

"""

def __init__(self, col: str):
super().__init__()
self.col = col

Check warning on line 3281 in cycquery/ops.py

View check run for this annotation

Codecov / codecov/patch

cycquery/ops.py#L3280-L3281

Added lines #L3280 - L3281 were not covered by tests

def __call__(self, table: TableTypes) -> Subquery:
"""Process the table.

Parameters
----------
table
Table on which to perform the operation.
col
Column to count.

Returns
-------
sqlalchemy.sql.selectable.Subquery
Processed table.

"""
table = _process_checks(table, cols=self.col)
count = func.count(get_column(table, self.col))

Check warning on line 3300 in cycquery/ops.py

View check run for this annotation

Codecov / codecov/patch

cycquery/ops.py#L3299-L3300

Added lines #L3299 - L3300 were not covered by tests

return select(count).subquery()

Check warning on line 3302 in cycquery/ops.py

View check run for this annotation

Codecov / codecov/patch

cycquery/ops.py#L3302

Added line #L3302 was not covered by tests
Loading