Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding TMC canalespy query helpers #960

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
67 changes: 67 additions & 0 deletions parsons/databases/table.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import datetime

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -164,3 +165,69 @@ def truncate(self):

self.db.query(f"TRUNCATE TABLE {self.table}")
logger.info(f"{self.table} truncated.")

def dedup_table(
self,
order_by_column_name=None,
order_by_direction=None,
cascade=False,
columns_to_ignore=None,
):
"""
Description:
This function re-creates a deduped version of a table by grabbing
all columns and inserting those into a partition statement for
row_number().
Args:
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
order_by_column_name: str (optional)
Column name of specific column that you would like to dedup using order by
order_by_direction: str (optional)
Order by direction, if you would like to dedup by ordering by a specific column,
this is the direction of the order by
example: 'asc'
cascade: bool (optional)
Set to True if you want any dependent views to be dropped -
queries will fail if there are dependent views and this is set to False.
columns_to_ignore: list (optional)
List any columns that should be ignored in the dedup
"""
current_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
run_cascade = "CASCADE" if cascade else ""
order_by_column_name = (
"random()" if order_by_column_name is None else order_by_column_name
)
if order_by_direction is None and order_by_column_name is not None:
raise Exception("order_by_direction argument is blank")

columns_list = self.columns

# remove order_by columns
columns_list.remove(
order_by_column_name
) if order_by_column_name is not None else None

# remove ignore columns
if columns_to_ignore is not None:
for column in columns_to_ignore:
columns_list.remove(column)

partition = ", ".join(columns_list)

dedup_query = f"""
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
create table {self.table}_temp_{current_timestamp} as
(select *
, row_number() over (partition by {partition}
order by {order_by_column_name} {order_by_direction}) as dup
from {self.table})
where dup=1;
alter table {self.table}_temp_{current_timestamp}
drop column dup;
truncate table {self.table}
insert into {self.table} (select * from {self.table}_temp_{current_timestamp})
{run_cascade};
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
"""

self.db.query(dedup_query)
logger.info(f"Finished deduping {self.table}...")

return None
5 changes: 5 additions & 0 deletions test/test_databases/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ def test_truncate(self):
self.tbl.truncate()
self.assertEqual(self.tbl.num_rows, 0)

def test_dedup_table(self):

self.tbl.dedup_table(order_by_column_name="user_name")
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(self.tbl.num_rows, 2)

def test_get_rows(self):

data = [
Expand Down
Loading