Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update gemini query nb #489

Merged
merged 3 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions cyclops/query/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -2915,6 +2915,46 @@ def __call__(self, table: TableTypes) -> Subquery:
return select(table).where(cond).subquery()


class DropEmpty(QueryOp):
"""Remove rows with empty values in some specified columns.

Parameters
----------
cols
Columns in which, if a value is empty, the corresponding row is removed.

Examples
--------
>>> DropEmpty("col1")(table)
>>> DropEmpty(["col1", "col2"])(table)

"""

def __init__(self, cols: typing.Union[str, typing.List[str]]):
super().__init__()
self.cols = cols

def __call__(self, table: TableTypes) -> Subquery:
"""Process the table.

Parameters
----------
table
Table on which to perform the operation.

Returns
-------
sqlalchemy.sql.selectable.Subquery
Processed table.

"""
self.cols = to_list(self.cols)
table = _process_checks(table, cols=self.cols)

cond = and_(*[not_equals(get_column(table, col), "") for col in self.cols])
return select(table).where(cond).subquery()


class Apply(QueryOp):
"""Apply function(s) to column(s).

Expand Down
162 changes: 67 additions & 95 deletions docs/source/tutorials/gemini/query_api.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,46 @@
"name": "stderr",
"output_type": "stream",
"text": [
"2023-03-21 17:09:15,267 \u001b[1;37mINFO\u001b[0m cyclops.query.orm - Database setup, ready to run queries!\n"
"2023-09-21 09:17:45,449 \u001b[1;37mINFO\u001b[0m cyclops.query.orm - Database setup, ready to run queries!\n"
]
},
{
"data": {
"text/plain": [
"['information_schema.sql_implementation_info',\n",
" 'information_schema.sql_parts',\n",
" 'information_schema.sql_sizing',\n",
" 'information_schema.sql_features',\n",
" 'public.ip_administrative',\n",
"['public.lookup_icd10_ca_description',\n",
" 'public.lookup_statcan',\n",
" 'public.lookup_cci',\n",
" 'public.lookup_icd10_ca_to_ccsr',\n",
" 'public.lookup_ip_administrative',\n",
" 'public.lookup_lab_concept',\n",
" 'public.lookup_vitals_concept',\n",
" 'public.lookup_pharmacy_concept',\n",
" 'public.lookup_diagnosis',\n",
" 'public.locality_variables',\n",
" 'public.admdad',\n",
" 'public.derived_variables',\n",
" 'public.diagnosis',\n",
" 'public.vitals',\n",
" 'public.intervention',\n",
" 'public.er_administrative',\n",
" 'public.imaging',\n",
" 'public.pharmacy',\n",
" 'public.ipscu',\n",
" 'public.lookup_phy_characteristics',\n",
" 'public.lab',\n",
" 'public.room_transfer',\n",
" 'public.ip_scu',\n",
" 'public.lookup_ip_administrative',\n",
" 'public.ipintervention',\n",
" 'public.lookup_ccsr',\n",
" 'public.hospital',\n",
" 'public.lookup_er_administrative',\n",
" 'public.blood_transfusion',\n",
" 'public.lookup_pharmacy_route',\n",
" 'public.lookup_transfusion_concept',\n",
" 'public.lookup_ip_scu',\n",
" 'public.lookup_diagnosis',\n",
" 'public.lookup_er_administrative',\n",
" 'public.lookup_imaging',\n",
" 'public.pharmacy',\n",
" 'public.radiology',\n",
" 'public.lookup_transfer',\n",
" 'public.ipdiagnosis',\n",
" 'public.lookup_room_transfer',\n",
" 'public.lookup_imaging']"
" 'public.er',\n",
" 'public.erdiagnosis',\n",
" 'public.erintervention',\n",
" 'public.roomtransfer',\n",
" 'public.transfusion',\n",
" 'public.vitals',\n",
" 'public.lookup_hospital_num']"
]
},
"execution_count": 1,
Expand All @@ -72,12 +82,12 @@
"\n",
"querier = GEMINIQuerier(\n",
" host=\"db.gemini-hpc.ca\",\n",
" database=\"delirium_v3_0_0\",\n",
" database=\"delirium_v4_0_1\",\n",
" user=\"username\",\n",
" password=\"password\",\n",
")\n",
"# List all tables.\n",
"querier.list_tables()"
"querier.list_tables(\"public\")"
]
},
{
Expand All @@ -98,15 +108,15 @@
"name": "stderr",
"output_type": "stream",
"text": [
"2023-03-21 17:09:20,365 \u001b[1;37mINFO\u001b[0m cyclops.query.orm - Query returned successfully!\n",
"2023-03-21 17:09:20,366 \u001b[1;37mINFO\u001b[0m cyclops.utils.profile - Finished executing function run_query in 4.825447 s\n"
"2023-09-21 09:17:51,902 \u001b[1;37mINFO\u001b[0m cyclops.query.orm - Query returned successfully!\n",
"2023-09-21 09:17:51,903 \u001b[1;37mINFO\u001b[0m cyclops.utils.profile - Finished executing function run_query in 6.093352 s\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"189606 rows extracted!\n"
"189734 rows extracted!\n"
]
}
],
Expand All @@ -118,8 +128,8 @@
" ),\n",
" qo.Distinct(\"patient_id_hashed\"),\n",
")\n",
"encounters_qi = querier.public.ip_administrative(ops=ops)\n",
"encounters = encounters_qi.run()\n",
"encounters = querier.public.admdad()\n",
"encounters = encounters.ops(ops).run()\n",
"print(f\"{len(encounters)} rows extracted!\")"
]
},
Expand All @@ -141,25 +151,27 @@
"name": "stderr",
"output_type": "stream",
"text": [
"2023-03-21 17:09:21,372 \u001b[1;37mINFO\u001b[0m cyclops.query.orm - Query returned successfully!\n",
"2023-03-21 17:09:21,373 \u001b[1;37mINFO\u001b[0m cyclops.utils.profile - Finished executing function run_query in 0.992901 s\n"
"2023-09-21 09:17:52,591 \u001b[1;37mINFO\u001b[0m cyclops.query.orm - Query returned successfully!\n",
"2023-09-21 09:17:52,592 \u001b[1;37mINFO\u001b[0m cyclops.utils.profile - Finished executing function run_query in 0.675141 s\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"17071 rows extracted!\n"
"32567 rows extracted!\n"
]
}
],
"source": [
"ops = qo.Sequential(\n",
" qo.Cast(\"discharge_date_time\", \"timestamp\"),\n",
" qo.ConditionAfterDate(\"discharge_date_time\", \"2015-04-01\"),\n",
" qo.ConditionBeforeDate(\"discharge_date_time\", \"2016-03-31\"),\n",
")\n",
"encounters_qi = querier.get_interface(encounters_qi.query, ops=ops)\n",
"encounters = encounters_qi.run()\n",
"encounters_query = querier.public.admdad()\n",
"encounters_query = encounters_query.ops(ops)\n",
"encounters = encounters_query.run()\n",
"print(f\"{len(encounters)} rows extracted!\")"
]
},
Expand All @@ -181,8 +193,8 @@
"name": "stderr",
"output_type": "stream",
"text": [
"2023-03-21 17:09:22,181 \u001b[1;37mINFO\u001b[0m cyclops.query.orm - Query returned successfully!\n",
"2023-03-21 17:09:22,182 \u001b[1;37mINFO\u001b[0m cyclops.utils.profile - Finished executing function run_query in 0.800708 s\n"
"2023-09-21 09:17:52,856 \u001b[1;37mINFO\u001b[0m cyclops.query.orm - Query returned successfully!\n",
"2023-09-21 09:17:52,857 \u001b[1;37mINFO\u001b[0m cyclops.utils.profile - Finished executing function run_query in 0.145693 s\n"
]
},
{
Expand All @@ -194,9 +206,8 @@
}
],
"source": [
"ops = qo.GroupByAggregate(\"hospital_id\", {\"hospital_id\": (\"count\", \"count\")})\n",
"encounters_per_site_qi = querier.get_interface(encounters_qi.query, ops=ops)\n",
"encounters_per_site = encounters_per_site_qi.run()\n",
"ops = qo.GroupByAggregate(\"hospital_num\", {\"hospital_num\": (\"count\", \"count\")})\n",
"encounters_per_site = encounters_query.ops(ops).run()\n",
"print(f\"{len(encounters_per_site)} rows extracted!\")"
]
},
Expand All @@ -205,7 +216,7 @@
"id": "848c5b90-1e66-4a5a-9ebb-9c0caa105c3f",
"metadata": {},
"source": [
"## Example 2a. How many sodium tests were placed between Apr 1, 2015 and May 31, 2015 at SMH?"
"## Example 2a. How many sodium tests were placed between Apr 1, 2015 and May 31, 2015 at hospital 101?"
]
},
{
Expand All @@ -218,76 +229,37 @@
"name": "stderr",
"output_type": "stream",
"text": [
"2023-03-21 17:09:59,972 \u001b[1;37mINFO\u001b[0m cyclops.query.orm - Query returned successfully!\n",
"2023-03-21 17:09:59,973 \u001b[1;37mINFO\u001b[0m cyclops.utils.profile - Finished executing function run_query in 37.740935 s\n"
"2023-09-21 09:26:19,814 \u001b[1;37mINFO\u001b[0m cyclops.query.orm - Query returned successfully!\n",
"2023-09-21 09:26:19,815 \u001b[1;37mINFO\u001b[0m cyclops.utils.profile - Finished executing function run_query in 506.939296 s\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 rows extracted!\n"
"1 rows extracted!\n",
" hospital_num count\n",
"0 101 9305\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>hospital_id</th>\n",
" <th>count</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>SMH</td>\n",
" <td>6053</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" hospital_id count\n",
"0 SMH 6053"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"encounter_ops = qo.ConditionEquals(\"hospital_id\", \"SMH\")\n",
"encounters_qi = querier.public.ip_administrative(ops=encounter_ops)\n",
"encounters = querier.public.admdad()\n",
"encounters = encounters.ops(qo.ConditionEquals(\"hospital_num\", 101))\n",
"lab_ops = qo.Sequential(\n",
" qo.ConditionAfterDate(\"sample_collection_date_time\", \"2015-04-01\"),\n",
" qo.ConditionBeforeDate(\"sample_collection_date_time\", \"2015-05-31\"),\n",
" qo.ConditionSubstring(\"lab_test_name_mapped\", \"sodium\"),\n",
" qo.GroupByAggregate(\"hospital_id\", {\"hospital_id\": (\"count\", \"count\")}),\n",
" qo.DropEmpty(\"collection_date_time\"),\n",
" qo.Cast(\"collection_date_time\", \"timestamp\"),\n",
" qo.ConditionAfterDate(\"collection_date_time\", \"2015-04-01\"),\n",
" qo.ConditionBeforeDate(\"collection_date_time\", \"2015-05-31\"),\n",
" qo.ConditionSubstring(\"test_type_mapped\", \"sodium\"),\n",
")\n",
"lab_qi = querier.public.lab(\n",
" ops=lab_ops,\n",
" join=qo.JoinArgs(join_table=encounters_qi.query, on=\"genc_id\"),\n",
"labs = querier.public.lab()\n",
"labs = labs.ops(lab_ops)\n",
"encounters_labs = encounters.join(labs, on=\"genc_id\")\n",
"encounters_labs = encounters_labs.ops(\n",
" qo.GroupByAggregate(\"hospital_num\", {\"hospital_num\": (\"count\", \"count\")}),\n",
")\n",
"sodium_tests = lab_qi.run()\n",
"sodium_tests = encounters_labs.run()\n",
"print(f\"{len(sodium_tests)} rows extracted!\")\n",
"print(sodium_tests)"
]
Expand Down
Loading