Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBT 1.3 (python udfs) #82

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ generated one.
7. `associated_columns_not_null`
<br><br>
```
INSERT INTO dot.configured_tests VALUES(TRUE, 'ScanProject1', 'd74fc600-31c3-307d-9501-5b7f6b09aff5', 'MISSING-1', 3, '',
'', '', 'dot_model__iccmview_assessment', 'associated_columns_not_null', 'diarrhea_dx', 'diarrhea diagnosis',
$${"name": "diarrhea_dx_has_duration", "col_value": True, "associated_columns": ['max_symptom_duration']}$$,
INSERT INTO dot.configured_tests VALUES(TRUE, 'ScanProject1', 'd74fc600-31c3-307d-9501-5b7f6b09aff5', 'MISSING-1', 3,
'diarrhea diagnosis', '', '', 'ca4513fa-96e0-3a95-a1a8-7f0c127ea82a', 'associated_columns_not_null', '', '',
$${"name": "diarrhea_dx_has_duration", "condition": "diarrhea_dx = True", "associated_columns": ['max_symptom_duration']}$$,
'2021-12-23 19:00:00.000 -0500', '2021-12-23 19:00:00.000 -0500', 'your-name');
```
8. `expect_similar_means_across_reporters`
Expand Down Expand Up @@ -933,6 +933,7 @@ need to do is ..

1. `exec -it dot /bin/bash`
2. `pytest dot/self_tests/unit`
3. `pytest dot/self_tests/integration`

##### On your local machine

Expand Down Expand Up @@ -972,11 +973,17 @@ ScanProjec1_db:
And finally you can run the tests from a terminal as follows:
```
pytest dot/self_tests/unit
pytest dot/self_tests/integration
```

#### Guidelines for adding new tests
- Existing tests are at [the self-tests folder](dot/self_tests/unit)
- All tests extend the [test base class](dot/self_tests/unit/base_self_test_class.py) that
- Existing unit tests are at [the self-tests folder](dot/self_tests/unit)
- When a function needs to be modified, ideally it will hava a passing test beforehand; if not, please consider adding it
- One integration test at [the integration self-tests folder](dot/self_tests/unit) that
- instead of running unit test for functions, it runs the full dot pipeline for the fake data and checks results
- it runs all the tests configured in [sample_dot_data.sql](db/dot/4-upload_sample_dot_data.sql)
- whenever a new test type is designed for dot, consider adding a line to the SQL above so that it gets tested
- All tests (both unit & integration) extend the [test base class](dot/self_tests/unit/base_self_test_class.py) that
- facilitates the import of modules under test
- recreates a directory in the file system for the test outputs
- provides a number of function for supporting tests that access the database, mocking the config files to point to the
Expand Down
2 changes: 1 addition & 1 deletion db/dot/1-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS dot.test_parameters_interface(
parameter VARCHAR(300) NOT NULL,
parameter_type VARCHAR(300) CHECK(parameter_type IN ('entity any field', 'entity id field', 'entity columns boolean logic',
'view/table', 'entity date field', 'one of (hour, day, week)',
'entity numeric field','sql statement','list of values')),
'entity numeric field', 'sql statement', 'list of values')),
example VARCHAR(300) NOT NULL,
description VARCHAR(1000) NOT NULL,
UNIQUE (test_type, parameter),
Expand Down
2 changes: 2 additions & 0 deletions db/dot/2-upload_static_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ INSERT INTO dot.test_parameters_interface VALUES('expect_similar_means_across_re
INSERT INTO dot.test_parameters_interface VALUES('expect_similar_means_across_reporters', 'quantity', 'entity numeric field', 'temperature', 'The name of the numeric field to analyze for variation');
INSERT INTO dot.test_parameters_interface VALUES('expect_similar_means_across_reporters', 'data_table', 'view/table', 'dot_model__iccmview_assessment', 'The name of entity view where data is');
INSERT INTO dot.test_parameters_interface VALUES('expect_similar_means_across_reporters', 'id_column', 'entity id field', 'reported_by', 'The id column to use to get failed test records');
INSERT INTO dot.test_parameters_interface VALUES('associated_columns_not_null', 'condition', 'entity columns boolean logic', 'stops = "non-stop"', 'Where clause of rows that are going to be checked');
INSERT INTO dot.test_parameters_interface VALUES('associated_columns_not_null', 'associated_columns', 'list of values', $$["price", "origin_iata", "destination_iata"]$$, 'List of column names that should not be null');

-- dot.scenario_test_types
INSERT INTO dot.scenario_test_types VALUES('MISSING-1', 'associated_columns_not_null');
Expand Down
16 changes: 14 additions & 2 deletions db/dot/4-upload_sample_dot_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ $${"table_specific_reported_date": "departure_time", "table_specific_patient_uui
"uuid", "table_specific_period": "day"}$$, '2021-12-23 19:00:00.000 -0500', '2022-03-21 19:00:00.000 -0500', 'Matt');

INSERT INTO dot.configured_tests VALUES(TRUE, 'ScanProject1', 'c4a3da8f-32f4-4e9b-b135-354de203ca90', 'TREAT-1',
5, 'Number of stops has a reasonible value', '', '', 'ca4513fa-96e0-3a95-a1a8-7f0c127ea82a', 'custom_sql', '', '',
format('{%s: %s}',
5, 'Number of stops has a reasonible value (as custom_sql)', '', '', 'ca4513fa-96e0-3a95-a1a8-7f0c127ea82a',
'custom_sql', '', '', format(
'{%s: %s}',
to_json('query'::text),
to_json($query$
select
Expand All @@ -79,6 +80,17 @@ format('{%s: %s}',
)::json,
'2021-12-23 19:00:00.000 -0500', '2021-12-23 19:00:00.000 -0500', 'Lorenzo');

INSERT INTO dot.configured_tests VALUES(TRUE, 'ScanProject1', '3081f033-e8f4-4f3b-aea8-36f8c5df05dc', 'INCONSISTENT-1',
8, 'Price is a positive number for direct flights', '', '', 'ca4513fa-96e0-3a95-a1a8-7f0c127ea82a', 'expression_is_true',
'', '', $${"name": "t_direct_flights_positive_price", "expression": "price is not null and price > 0",
"condition": "stops = 'non-stop'"}$$, '2022-12-10 19:00:00.000 -0500', '2022-12-10 19:00:00.000 -0500', 'Lorenzo');

INSERT INTO dot.configured_tests VALUES(TRUE, 'ScanProject1', 'd74fc600-31c3-307d-9501-5b7f6b09aff5', 'MISSING-1',
3, 'Direct flights have price, origin & destination', '', '', 'ca4513fa-96e0-3a95-a1a8-7f0c127ea82a',
'associated_columns_not_null', '', '', $${"name": "t_direct_flights_hava_data", "condition": "stops = 'non-stop'",
"associated_columns": ["price", "origin_iata", "destination_iata"]}$$, '2022-12-11 19:00:00.000 -0500',
'2022-12-11 19:00:00.000 -0500', 'Lorenzo');

COMMIT;


145 changes: 74 additions & 71 deletions docker/airflow/dags/run_dot_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@


def get_object(
object_name_in,
earliest_date_to_sync,
date_field,
source_conn_in,
columns_to_exclude,
object_name_in,
earliest_date_to_sync,
date_field,
source_conn_in,
columns_to_exclude,
):
"""

Expand All @@ -47,18 +47,9 @@ def get_object(

connection = BaseHook.get_connection(source_conn_in)

sql_stmt = (
"SELECT * FROM "
+ connection.schema
+ "."
+ object_name_in
)
sql_stmt = "SELECT * FROM " + connection.schema + "." + object_name_in
if date_field != None:
sql_stmt += (" WHERE "
+ date_field
+ " >= '"
+ earliest_date_to_sync
+ "'")
sql_stmt += " WHERE " + date_field + " >= '" + earliest_date_to_sync + "'"
print(sql_stmt)
pg_hook = PostgresHook(postgres_conn_id=source_conn_in, schema=source_conn_in)
pg_conn = pg_hook.get_conn()
Expand All @@ -67,13 +58,13 @@ def get_object(
data = cursor.fetchall()

sql_stmt = (
"SELECT column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = '"
+ object_name_in
+ "'"
+ "AND table_schema = '"
+ connection.schema
+ "' "
+ " ORDER BY ordinal_position "
"SELECT column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = '"
+ object_name_in
+ "'"
+ "AND table_schema = '"
+ connection.schema
+ "' "
+ " ORDER BY ordinal_position "
)
print(sql_stmt)
pg_hook = PostgresHook(postgres_conn_id=source_conn_in, schema=source_conn_in)
Expand All @@ -85,16 +76,20 @@ def get_object(
# Fail back to views
if len(columns) == 0:
sql_stmt = (
"SELECT a.attname as \"column_name\","
+ " pg_catalog.format_type(a.atttypid, a.atttypmod) as \"data_type\" "
+ " FROM pg_attribute a "
+ " JOIN pg_class t on a.attrelid = t.oid "
+ " JOIN pg_namespace s on t.relnamespace = s.oid "
+ " WHERE a.attnum > 0 "
+ " AND NOT a.attisdropped "
+ " AND t.relname = '" + object_name_in + "' "
+ " AND s.nspname = '" + connection.schema + "' "
+ " ORDER BY a.attnum; "
'SELECT a.attname as "column_name",'
+ ' pg_catalog.format_type(a.atttypid, a.atttypmod) as "data_type" '
+ " FROM pg_attribute a "
+ " JOIN pg_class t on a.attrelid = t.oid "
+ " JOIN pg_namespace s on t.relnamespace = s.oid "
+ " WHERE a.attnum > 0 "
+ " AND NOT a.attisdropped "
+ " AND t.relname = '"
+ object_name_in
+ "' "
+ " AND s.nspname = '"
+ connection.schema
+ "' "
+ " ORDER BY a.attnum; "
)
print(sql_stmt)
pg_hook = PostgresHook(postgres_conn_id=source_conn_in, schema=source_conn_in)
Expand Down Expand Up @@ -136,7 +131,7 @@ def get_object(


def save_object(
object_name_in, target_conn_in, data_in, column_list_in, type_list_in, source_db_in
object_name_in, target_conn_in, data_in, column_list_in, type_list_in, source_db_in
):
"""

Expand Down Expand Up @@ -164,16 +159,16 @@ def save_object(

connection = BaseHook.get_connection(target_conn_in)
connection_string = (
"postgresql://"
+ str(connection.login)
+ ":"
+ str(connection.password)
+ "@"
+ str(connection.host)
+ ":"
+ str(connection.port)
+ "/"
+ target_conn_in
"postgresql://"
+ str(connection.login)
+ ":"
+ str(connection.password)
+ "@"
+ str(connection.host)
+ ":"
+ str(connection.port)
+ "/"
+ target_conn_in
)

engine = create_engine(
Expand All @@ -190,7 +185,7 @@ def save_object(
# This will also drop any DOT model views onto this data
if MODE == "replace":
with PostgresHook(
postgres_conn_id=target_conn_in, schema=target_conn_in
postgres_conn_id=target_conn_in, schema=target_conn_in
).get_conn() as conn:
cur = conn.cursor()
query = f"DROP TABLE IF EXISTS {schema}.{object_name_in} CASCADE;"
Expand All @@ -202,7 +197,7 @@ def save_object(

# Test to see if schema exists, if not, create
with PostgresHook(
postgres_conn_id=target_conn_in, schema=target_conn_in
postgres_conn_id=target_conn_in, schema=target_conn_in
).get_conn() as conn:
cur = conn.cursor()
query = f"CREATE SCHEMA IF NOT EXISTS {schema};"
Expand All @@ -220,20 +215,20 @@ def save_object(
using = f"USING {col}::{type}"
query = f"ALTER TABLE {schema}.{object_name_in} ALTER COLUMN {col} TYPE {type} {using};"
with PostgresHook(
postgres_conn_id=target_conn_in, schema=target_conn_in
postgres_conn_id=target_conn_in, schema=target_conn_in
).get_conn() as conn:
cur = conn.cursor()
print(query)
cur.execute(query)


def sync_object(
object_name_in,
earliest_date_to_sync,
date_field,
source_conn_in,
target_conn_in,
columns_to_exclude,
object_name_in,
earliest_date_to_sync,
date_field,
source_conn_in,
target_conn_in,
columns_to_exclude,
):
"""

Expand Down Expand Up @@ -269,12 +264,14 @@ def sync_object(
object_name_in, target_conn_in, data, column_list, type_list, source_conn_in
)


def drop_tables_in_dot_tests_schema(target_conn_in, schema_to_drop_from):
"""
We are syncing new data where new columns and columns types might change.
Postgres will prevent ALTER TABLE if any views exist, so we will drop all tables in the dot test schema.
These will be recreated in the dot run.
This assumes the dot tests schema is dot_data_tests (defined as variable "schema_to_drop_from").
Postgres will prevent ALTER TABLE if any views exist, so we will drop all
tables in the dot test schema. These will be recreated in the dot run.
This assumes the dot tests schema is dot_data_tests
(defined as variable "schema_to_drop_from").

Input
-----
Expand All @@ -288,21 +285,24 @@ def drop_tables_in_dot_tests_schema(target_conn_in, schema_to_drop_from):
"""

with PostgresHook(
postgres_conn_id=target_conn_in, schema=target_conn_in
postgres_conn_id=target_conn_in, schema=target_conn_in
).get_conn() as conn:
cur = conn.cursor()
query1 = f"SET search_path TO {schema_to_drop_from}"
query2 = f"DO $$ DECLARE " \
f"r RECORD; " \
f"BEGIN " \
f"FOR r IN (SELECT table_name FROM information_schema.tables WHERE table_schema = current_schema()) " \
f"LOOP " \
f"EXECUTE 'DROP TABLE IF EXISTS ' || QUOTE_IDENT(r.table_name) || ' CASCADE'; " \
f"END LOOP; " \
f"END $$; "
query2 = (
f"DO $$ DECLARE "
f"r RECORD; "
f"BEGIN "
f"FOR r IN (SELECT table_name FROM information_schema.tables WHERE table_schema = current_schema()) "
f"LOOP "
f"EXECUTE 'DROP TABLE IF EXISTS ' || QUOTE_IDENT(r.table_name) || ' CASCADE'; "
f"END LOOP; "
f"END $$; "
)
cur.execute(query1)
cur.execute(query2)


def run_dot_app(project_id_in):
"""
Method to run the DOT.
Expand Down Expand Up @@ -335,10 +335,10 @@ def default_config():


with DAG(
dag_id="run_dot_project",
schedule_interval="@weekly",
start_date=datetime(year=2022, month=3, day=1),
catchup=False,
dag_id="run_dot_project",
schedule_interval="@weekly",
start_date=datetime(year=2022, month=3, day=1),
catchup=False,
) as dag:
config = json.loads(Variable.get("dot_config", default_var=default_config().read()))

Expand Down Expand Up @@ -374,7 +374,7 @@ def default_config():
python_callable=drop_tables_in_dot_tests_schema,
op_kwargs={
"target_conn_in": target_conn,
"schema_to_drop_from": schema_to_drop_from
"schema_to_drop_from": schema_to_drop_from,
},
dag=dag,
)
Expand All @@ -384,7 +384,10 @@ def default_config():
for i in range(len(objects_to_sync)):

object_name = objects_to_sync[i]["object"]
if "date_field" in objects_to_sync[i] and objects_to_sync[i]["date_field"] != "":
if (
"date_field" in objects_to_sync[i]
and objects_to_sync[i]["date_field"] != ""
):
date_field = objects_to_sync[i]["date_field"]
else:
date_field = None
Expand Down
14 changes: 8 additions & 6 deletions docker/run_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

url_demo_data = "https://drive.google.com/uc?id=157Iad8mHnwbZ_dAeLQy5XfLihhcpD6yc"
filename_demo_data = "dot_demo_data.tar.gz"
url_dot_ui = "http://localhost:82/app/data-observation-toolkit/run-log-634491ea0da61b0e9f38760d?embed=True"
url_dot_ui = "http://localhost:82/app/data-observation-toolkit/run-log-634491ea0da61b0e9f38760d?embed=True" # pylint: disable=line-too-long

# Check if db, appsmith and tar file are there and if so, delete them.
os.chdir("demo/")
Expand All @@ -30,12 +30,12 @@

# Open/Extract tarfile
with tarfile.open(filename_demo_data) as my_tar:
my_tar.extractall('')
my_tar.extractall("")
my_tar.close()

with open("./db/.env") as f:
demo_pwd=f.read().split("=")[1]
os.environ['POSTGRES_PASSWORD'] = demo_pwd
demo_pwd = f.read().split("=")[1]
os.environ["POSTGRES_PASSWORD"] = demo_pwd

# Composing and running container(s)
print("Starting DOT...\n")
Expand All @@ -49,8 +49,10 @@

webbrowser.open(url_dot_ui)

print("In case DOT was not opened in your browser, please go to this URL: "
"http://localhost:82/app/data-observation-toolkit/run-log-634491ea0da61b0e9f38760d?embed=True\n")
print(
"In case DOT was not opened in your browser, please go to this URL: "
"http://localhost:82/app/data-observation-toolkit/run-log-634491ea0da61b0e9f38760d?embed=True\n"
)
input("Press return to stop DOT container\n")
print("Container is being stopped - we hope you enjoyed this demo :)")
docker.compose.stop()
Loading
Loading