From ea2751afbbfcad0d0b9bf0cd9375e775ccbb6d76 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Mon, 18 Dec 2023 08:24:27 -1000 Subject: [PATCH 1/9] add get_sql_from_file function --- parsons/utilities/sql_helpers.py | 14 ++++++++++++++ test/test_utilities.py | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/parsons/utilities/sql_helpers.py b/parsons/utilities/sql_helpers.py index fe3753c971..3196c80cea 100644 --- a/parsons/utilities/sql_helpers.py +++ b/parsons/utilities/sql_helpers.py @@ -12,3 +12,17 @@ def redact_credentials(sql): sql_censored = re.sub(pattern, "CREDENTIALS REDACTED", sql, flags=re.IGNORECASE) return sql_censored + + +def sql_from_file(sql_file): + """ + Description: + This function allows you to grab SQL defined in a separate file. + `Args`: + sql_file: str + The relevant file path + `Returns:` + The SQL from the file + """ + with open(sql_file, "r") as f: + return f.read() diff --git a/test/test_utilities.py b/test/test_utilities.py index ec4a4eaf2d..c21ede4cba 100644 --- a/test/test_utilities.py +++ b/test/test_utilities.py @@ -150,6 +150,20 @@ def test_redact_credentials(): assert sql_helpers.redact_credentials(test_str) == test_result +def test_get_sql_from_file(): + + # Test query string + test_str = "select * from schema.tablename limit 10" + + # Create fake file. + os.mkdir("tmp") + test_file_name = "tmp/sql_file.txt" + with open(test_file_name, "w+") as sql_file: + sql_file.write(test_str) + + assert sql_helpers.get_sql_from_file(test_file_name) == test_str + + class TestCheckEnv(unittest.TestCase): def test_environment_field(self): """Test check field""" From 3fec174849ee274592def21acc571a63612676d0 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Mon, 18 Dec 2023 09:39:12 -1000 Subject: [PATCH 2/9] add dedup_table function --- parsons/databases/table.py | 68 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/parsons/databases/table.py b/parsons/databases/table.py index a6b2f7464c..fed747f1b1 100644 --- a/parsons/databases/table.py +++ b/parsons/databases/table.py @@ -1,4 +1,5 @@ import logging +import datetime logger = logging.getLogger(__name__) @@ -164,3 +165,70 @@ def truncate(self): self.db.query(f"TRUNCATE TABLE {self.table}") logger.info(f"{self.table} truncated.") + + def dedup_table( + self, + order_by_column_name=None, + order_by_direction=None, + cascade=False, + columns_to_ignore=None, + ): + """ + Description: + This function re-creates a deduped version of a table by grabbing + all columns and inserting those into a partition statement for + row_number(). + Args: + order_by_column_name: str + Column name of specific column that you would like to dedup using order by + order_by_direction: str + Order by direction, if you would like to dedup by ordering by a specific column, + this is the direction of the order by + example: 'asc' + cascade: bool + Set to True if you want any dependent views to be dropped - + queries will fail if there are dependent views and this is set to False. + columns_to_ignore: list + List any columns that should be ignored in the dedup + """ + current_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + run_cascade = "CASCADE" if cascade else "" + order_by_column_name = ( + "random()" if order_by_column_name is None else order_by_column_name + ) + if order_by_direction is None and order_by_column_name is not None: + raise Exception("order_by_direction argument is blank") + + columns_list = self.columns + + # remove order_by columns + columns_list.remove( + order_by_column_name + ) if order_by_column_name is not None else None + + # remove ignore columns + if columns_to_ignore is not None: + for column in columns_to_ignore: + columns_list.remove(column) + + partition = ", ".join(columns_list) + + dedup_query = f""" + alter table {self.table} + rename to {self.table}temp_{current_timestamp}; + create table {self.table} as + select * from + (select * + , row_number() over (partition by {partition} + order by {order_by_column_name} {order_by_direction}) as dup + from {self.table}_temp_{current_timestamp}) + where dup=1; + alter table {self.table} + drop column dup; + drop table {self.table}temp_{current_timestamp} {run_cascade}; + """ + + self.db.query(dedup_query) + logger.info(f"Finished deduping {self.table}...") + + return None From 628e7bdc29450215f56a05e8c67f210a9441db14 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Tue, 19 Dec 2023 05:48:00 -1000 Subject: [PATCH 3/9] add unit tests --- parsons/databases/table.py | 2 +- test/test_databases/test_mysql.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/parsons/databases/table.py b/parsons/databases/table.py index fed747f1b1..dfe9659c3f 100644 --- a/parsons/databases/table.py +++ b/parsons/databases/table.py @@ -215,7 +215,7 @@ def dedup_table( dedup_query = f""" alter table {self.table} - rename to {self.table}temp_{current_timestamp}; + rename to {self.table}_temp_{current_timestamp}; create table {self.table} as select * from (select * diff --git a/test/test_databases/test_mysql.py b/test/test_databases/test_mysql.py index 323b4ffbf6..f8d397d4f6 100644 --- a/test/test_databases/test_mysql.py +++ b/test/test_databases/test_mysql.py @@ -108,6 +108,11 @@ def test_truncate(self): self.tbl.truncate() self.assertEqual(self.tbl.num_rows, 0) + def test_dedup_table(self): + + self.tbl.dedup_table(order_by_column_name="user_name") + self.assertEqual(self.tbl.num_rows, 2) + def test_get_rows(self): data = [ From 47aeb557ff216ef546cd17bfe3bd8560f2974617 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Tue, 19 Dec 2023 05:56:11 -1000 Subject: [PATCH 4/9] add save unsaved function --- parsons/utilities/sql_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsons/utilities/sql_helpers.py b/parsons/utilities/sql_helpers.py index 3196c80cea..a2fa754fa1 100644 --- a/parsons/utilities/sql_helpers.py +++ b/parsons/utilities/sql_helpers.py @@ -14,7 +14,7 @@ def redact_credentials(sql): return sql_censored -def sql_from_file(sql_file): +def get_sql_from_file(sql_file): """ Description: This function allows you to grab SQL defined in a separate file. From f5cde3a910bd18aed56f62feacfbb00c418f072d Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Tue, 19 Dec 2023 06:01:07 -1000 Subject: [PATCH 5/9] add get_sql_from_file to __all__ --- parsons/utilities/sql_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsons/utilities/sql_helpers.py b/parsons/utilities/sql_helpers.py index a2fa754fa1..3ab65b3b44 100644 --- a/parsons/utilities/sql_helpers.py +++ b/parsons/utilities/sql_helpers.py @@ -1,6 +1,6 @@ import re -__all__ = ["redact_credentials"] +__all__ = ["redact_credentials", "get_sql_from_file"] def redact_credentials(sql): From 68cfc5da0eab1a8417c2bd30f919023e1f1f8a0b Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Mon, 20 May 2024 03:09:24 -1000 Subject: [PATCH 6/9] pr comments fix --- parsons/databases/table.py | 21 ++++++++++----------- parsons/utilities/sql_helpers.py | 16 +--------------- test/test_utilities.py | 14 -------------- 3 files changed, 11 insertions(+), 40 deletions(-) diff --git a/parsons/databases/table.py b/parsons/databases/table.py index dfe9659c3f..3c0e06c85c 100644 --- a/parsons/databases/table.py +++ b/parsons/databases/table.py @@ -179,16 +179,16 @@ def dedup_table( all columns and inserting those into a partition statement for row_number(). Args: - order_by_column_name: str + order_by_column_name: str (optional) Column name of specific column that you would like to dedup using order by - order_by_direction: str + order_by_direction: str (optional) Order by direction, if you would like to dedup by ordering by a specific column, this is the direction of the order by example: 'asc' - cascade: bool + cascade: bool (optional) Set to True if you want any dependent views to be dropped - queries will fail if there are dependent views and this is set to False. - columns_to_ignore: list + columns_to_ignore: list (optional) List any columns that should be ignored in the dedup """ current_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") @@ -214,18 +214,17 @@ def dedup_table( partition = ", ".join(columns_list) dedup_query = f""" - alter table {self.table} - rename to {self.table}_temp_{current_timestamp}; - create table {self.table} as - select * from + create table {self.table}_temp_{current_timestamp} as (select * , row_number() over (partition by {partition} order by {order_by_column_name} {order_by_direction}) as dup - from {self.table}_temp_{current_timestamp}) + from {self.table}) where dup=1; - alter table {self.table} + alter table {self.table}_temp_{current_timestamp} drop column dup; - drop table {self.table}temp_{current_timestamp} {run_cascade}; + truncate table {self.table} + insert into {self.table} (select * from {self.table}_temp_{current_timestamp}) + {run_cascade}; """ self.db.query(dedup_query) diff --git a/parsons/utilities/sql_helpers.py b/parsons/utilities/sql_helpers.py index 3ab65b3b44..fe3753c971 100644 --- a/parsons/utilities/sql_helpers.py +++ b/parsons/utilities/sql_helpers.py @@ -1,6 +1,6 @@ import re -__all__ = ["redact_credentials", "get_sql_from_file"] +__all__ = ["redact_credentials"] def redact_credentials(sql): @@ -12,17 +12,3 @@ def redact_credentials(sql): sql_censored = re.sub(pattern, "CREDENTIALS REDACTED", sql, flags=re.IGNORECASE) return sql_censored - - -def get_sql_from_file(sql_file): - """ - Description: - This function allows you to grab SQL defined in a separate file. - `Args`: - sql_file: str - The relevant file path - `Returns:` - The SQL from the file - """ - with open(sql_file, "r") as f: - return f.read() diff --git a/test/test_utilities.py b/test/test_utilities.py index c21ede4cba..ec4a4eaf2d 100644 --- a/test/test_utilities.py +++ b/test/test_utilities.py @@ -150,20 +150,6 @@ def test_redact_credentials(): assert sql_helpers.redact_credentials(test_str) == test_result -def test_get_sql_from_file(): - - # Test query string - test_str = "select * from schema.tablename limit 10" - - # Create fake file. - os.mkdir("tmp") - test_file_name = "tmp/sql_file.txt" - with open(test_file_name, "w+") as sql_file: - sql_file.write(test_str) - - assert sql_helpers.get_sql_from_file(test_file_name) == test_str - - class TestCheckEnv(unittest.TestCase): def test_environment_field(self): """Test check field""" From 37d481cf1b91a89b7ea27d27305c43053bd551a7 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Thu, 23 May 2024 12:25:44 -1000 Subject: [PATCH 7/9] pr comments --- parsons/databases/table.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/parsons/databases/table.py b/parsons/databases/table.py index 3c0e06c85c..1453166306 100644 --- a/parsons/databases/table.py +++ b/parsons/databases/table.py @@ -193,18 +193,14 @@ def dedup_table( """ current_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") run_cascade = "CASCADE" if cascade else "" - order_by_column_name = ( - "random()" if order_by_column_name is None else order_by_column_name - ) + order_by_column_name = "random()" if order_by_column_name is None else order_by_column_name if order_by_direction is None and order_by_column_name is not None: raise Exception("order_by_direction argument is blank") columns_list = self.columns # remove order_by columns - columns_list.remove( - order_by_column_name - ) if order_by_column_name is not None else None + columns_list.remove(order_by_column_name) if order_by_column_name is not None else None # remove ignore columns if columns_to_ignore is not None: @@ -222,9 +218,10 @@ def dedup_table( where dup=1; alter table {self.table}_temp_{current_timestamp} drop column dup; - truncate table {self.table} + truncate table {self.table}; insert into {self.table} (select * from {self.table}_temp_{current_timestamp}) - {run_cascade}; + {run_cascade}; + drop table {self.table}_temp_{current_timestamp} """ self.db.query(dedup_query) From a41b9bb05d19fa0e781687d31c6aeeca94dbdee9 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Thu, 23 May 2024 13:30:42 -1000 Subject: [PATCH 8/9] drop view --- parsons/databases/table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsons/databases/table.py b/parsons/databases/table.py index 0318802ff7..09ee242598 100644 --- a/parsons/databases/table.py +++ b/parsons/databases/table.py @@ -248,7 +248,7 @@ def dedup_table( truncate table {self.table}; insert into {self.table} (select * from {self.table}_temp_{current_timestamp}) {run_cascade}; - drop table {self.table}_temp_{current_timestamp} + drop view {self.table}_temp_{current_timestamp} """ self.db.query(dedup_query) From 1ebd453a0e232dc749400a35657a9e64b0a34891 Mon Sep 17 00:00:00 2001 From: sharinetmc Date: Fri, 24 May 2024 11:25:51 -1000 Subject: [PATCH 9/9] flake8 errors --- parsons/databases/table.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/parsons/databases/table.py b/parsons/databases/table.py index 5be6a36318..bfae1f38dc 100644 --- a/parsons/databases/table.py +++ b/parsons/databases/table.py @@ -1,7 +1,5 @@ import logging import datetime -from typing import Optional -from parsons import Table logger = logging.getLogger(__name__)