From dfa63d269ea2928957c02266c0dd97caf4845bd3 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Wed, 1 May 2024 10:06:21 -0400 Subject: [PATCH 1/9] add schema checks --- hooli_data_eng/assets/raw_data/__init__.py | 6 ++++++ hooli_data_eng/definitions.py | 5 +++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index b2deb42..bf47e6d 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -6,6 +6,7 @@ AssetCheckSeverity, AssetCheckResult, AssetKey, + build_column_schema_change_checks, BackfillPolicy, Backoff, DailyPartitionsDefinition, @@ -99,5 +100,10 @@ def orders(context, api: RawDataAPI) -> pd.DataFrame: all_orders_df['dt'] = pd.to_datetime(all_orders_df['dt'], unit = "ms") return all_orders_df +raw_data_schema_checks = build_column_schema_change_checks(assets=[ + AssetKey(["RAW_DATA", "orders"]), + AssetKey(["RAW_DATA", "users"]), +]) + from dagster_dbt import dbt_assets diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index fac6108..174c846 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -9,7 +9,7 @@ from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets from hooli_data_eng.assets.dbt_assets import dbt_slim_ci_job from hooli_data_eng.assets.marketing import check_avg_orders -from hooli_data_eng.assets.raw_data import check_users +from hooli_data_eng.assets.raw_data import check_users, raw_data_schema_checks from hooli_data_eng.jobs import analytics_job, predict_job from hooli_data_eng.resources import get_env, resource_def from hooli_data_eng.schedules import analytics_schedule @@ -42,6 +42,7 @@ dbt_assets = load_assets_from_modules([dbt_assets]) + # Our final set of assets represent Python code that # should run after dbt. These assets are defined in # assets/forecasting/__init__.py @@ -62,7 +63,7 @@ {"max_concurrent": 3} ), assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets], - asset_checks=[check_users, check_avg_orders, avg_orders_freshness_check, min_order_freshness_check], + asset_checks=[*raw_data_schema_checks, check_users, check_avg_orders, avg_orders_freshness_check, min_order_freshness_check], resources=resource_def[get_env()], schedules=[analytics_schedule, avg_orders_freshness_check_schedule], sensors=[ From e180a8ffdd69d6ac73b7ed3217d7233e5519b631 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Wed, 1 May 2024 10:06:40 -0400 Subject: [PATCH 2/9] multi asset freshness check for testing --- hooli_data_eng/assets/marketing/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index 8b1f760..0c24652 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -8,6 +8,7 @@ AssetExecutionContext, AssetCheckResult, asset_check, + AssetKey, FreshnessPolicy, define_asset_job, ScheduleDefinition, @@ -84,7 +85,9 @@ def key_product_deepdive(context, sku_stats): min_order_freshness_check = build_last_update_freshness_checks( - assets=[min_order], + assets=[min_order, + AssetKey(["RAW_DATA", "orders"]), + AssetKey(["RAW_DATA", "users"])], lower_bound_delta=datetime.timedelta( hours=24 ), # expect new data at least once a day From 23f6e3e1fb79b9e30f34015909725358084ba6c8 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Wed, 1 May 2024 17:06:25 -0400 Subject: [PATCH 3/9] add dbt schema checks --- hooli_data_eng/definitions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 174c846..b736634 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -3,6 +3,7 @@ Definitions, load_assets_from_modules, load_assets_from_package_module, + build_column_schema_change_checks, multiprocess_executor, ) @@ -42,6 +43,7 @@ dbt_assets = load_assets_from_modules([dbt_assets]) +dbt_asset_checks = build_column_schema_change_checks(assets=[*dbt_assets]) # Our final set of assets represent Python code that # should run after dbt. These assets are defined in @@ -63,7 +65,7 @@ {"max_concurrent": 3} ), assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets], - asset_checks=[*raw_data_schema_checks, check_users, check_avg_orders, avg_orders_freshness_check, min_order_freshness_check], + asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, avg_orders_freshness_check, min_order_freshness_check], resources=resource_def[get_env()], schedules=[analytics_schedule, avg_orders_freshness_check_schedule], sensors=[ From 1487d57666dc08f84f6b4d3e84c96145c5dc22a2 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Thu, 2 May 2024 19:54:02 -0400 Subject: [PATCH 4/9] bump packages --- dbt_project/package-lock.yml | 6 +++--- dbt_project/packages.yml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbt_project/package-lock.yml b/dbt_project/package-lock.yml index ab2c625..58c33e4 100644 --- a/dbt_project/package-lock.yml +++ b/dbt_project/package-lock.yml @@ -2,8 +2,8 @@ packages: - package: calogica/dbt_expectations version: 0.10.3 - git: https://github.com/dagster-io/dagster.git - revision: 0ffe859627ccb83c7fb4d9eae17ef55f5fdd6bef + revision: fbe9818b8c0f1abdfe35b39f1b6e42bdc43c0871 subdirectory: python_modules/libraries/dagster-dbt/dbt_packages/dagster - package: calogica/dbt_date - version: 0.10.0 -sha1_hash: d4d1f8c5fedc2d6f6b2284a96b38abefe890c658 + version: 0.10.1 +sha1_hash: 2bda3e86ee52048bc3def45b34737c0c18d93456 diff --git a/dbt_project/packages.yml b/dbt_project/packages.yml index 94823a7..d083f9f 100644 --- a/dbt_project/packages.yml +++ b/dbt_project/packages.yml @@ -3,4 +3,4 @@ packages: version: 0.10.3 - git: "https://github.com/dagster-io/dagster.git" subdirectory: "python_modules/libraries/dagster-dbt/dbt_packages/dagster" - revision: "1.7.1" # replace with the version of `dagster` you are using. + revision: "1.7.4" # replace with the version of `dagster` you are using. From 589ef3d0a5cd65e2e6085e6073d4d47a37b3c63e Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Thu, 9 May 2024 14:56:00 -0400 Subject: [PATCH 5/9] bump dbt package version --- dbt_project/package-lock.yml | 4 ++-- dbt_project/packages.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbt_project/package-lock.yml b/dbt_project/package-lock.yml index 58c33e4..d2e1d6d 100644 --- a/dbt_project/package-lock.yml +++ b/dbt_project/package-lock.yml @@ -2,8 +2,8 @@ packages: - package: calogica/dbt_expectations version: 0.10.3 - git: https://github.com/dagster-io/dagster.git - revision: fbe9818b8c0f1abdfe35b39f1b6e42bdc43c0871 + revision: b75059b249d1388f99bd49b1b46f9c99686a3a6f subdirectory: python_modules/libraries/dagster-dbt/dbt_packages/dagster - package: calogica/dbt_date version: 0.10.1 -sha1_hash: 2bda3e86ee52048bc3def45b34737c0c18d93456 +sha1_hash: 0da1ef521792fc305eb81faabcffbc88ce85ec0e diff --git a/dbt_project/packages.yml b/dbt_project/packages.yml index d083f9f..1d8270d 100644 --- a/dbt_project/packages.yml +++ b/dbt_project/packages.yml @@ -3,4 +3,4 @@ packages: version: 0.10.3 - git: "https://github.com/dagster-io/dagster.git" subdirectory: "python_modules/libraries/dagster-dbt/dbt_packages/dagster" - revision: "1.7.4" # replace with the version of `dagster` you are using. + revision: "1.7.5" # replace with the version of `dagster` you are using. From 18b9bb61504e16e849be403e7bf8d2b1143bb374 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Thu, 9 May 2024 15:03:43 -0400 Subject: [PATCH 6/9] lock dbt versions --- setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 6ff75e1..ea4d19c 100644 --- a/setup.py +++ b/setup.py @@ -12,9 +12,9 @@ "pandas", "numpy", "scipy", - "dbt-core", - "dbt-duckdb", - "dbt-snowflake", + "dbt-core~=1.7.0", + "dbt-duckdb~=1.7.5", + "dbt-snowflake~=1.7.0", "dagster-duckdb", "dagster-aws", "dagster-duckdb-pandas", From b2b2d094478740bec1e26bdf8019211f926198bc Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Thu, 9 May 2024 15:11:50 -0400 Subject: [PATCH 7/9] lock dbt to <1.8.0 --- .github/workflows/deploy-dagster-cloud.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy-dagster-cloud.yml b/.github/workflows/deploy-dagster-cloud.yml index 756f159..d0d4692 100644 --- a/.github/workflows/deploy-dagster-cloud.yml +++ b/.github/workflows/deploy-dagster-cloud.yml @@ -78,7 +78,7 @@ jobs: if: steps.prerun.outputs.result != 'skip' run: | pip install pip --upgrade; - pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb dbt-snowflake --upgrade --upgrade-strategy eager; + pip install dagster-dbt dagster-cloud dbt-core~=1.7.0 dbt-duckdb~=1.7.5 dbt-snowflake~=1.7.4 --upgrade --upgrade-strategy eager; make deps dagster-dbt project prepare-for-deployment --file hooli_data_eng/project.py dagster-cloud ci dagster-dbt project manage-state --file hooli_data_eng/project.py --source-deployment data-eng-prod From 089a93e8fd8810d1b75c1b37b95f27dd9cb5e3de Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Thu, 9 May 2024 15:14:12 -0400 Subject: [PATCH 8/9] lock proper dbt-snowflake version --- .github/workflows/deploy-dagster-cloud.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy-dagster-cloud.yml b/.github/workflows/deploy-dagster-cloud.yml index d0d4692..defdd21 100644 --- a/.github/workflows/deploy-dagster-cloud.yml +++ b/.github/workflows/deploy-dagster-cloud.yml @@ -78,7 +78,7 @@ jobs: if: steps.prerun.outputs.result != 'skip' run: | pip install pip --upgrade; - pip install dagster-dbt dagster-cloud dbt-core~=1.7.0 dbt-duckdb~=1.7.5 dbt-snowflake~=1.7.4 --upgrade --upgrade-strategy eager; + pip install dagster-dbt dagster-cloud dbt-core~=1.7.0 dbt-duckdb~=1.7.5 dbt-snowflake~=1.7.3 --upgrade --upgrade-strategy eager; make deps dagster-dbt project prepare-for-deployment --file hooli_data_eng/project.py dagster-cloud ci dagster-dbt project manage-state --file hooli_data_eng/project.py --source-deployment data-eng-prod From b886fe62623e3fb5bddbcb28e47f4a01f205569b Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Fri, 10 May 2024 09:40:25 -0400 Subject: [PATCH 9/9] use new freshness as a list --- hooli_data_eng/assets/marketing/__init__.py | 5 +++-- hooli_data_eng/definitions.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index 0c24652..9b6017e 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -87,7 +87,8 @@ def key_product_deepdive(context, sku_stats): min_order_freshness_check = build_last_update_freshness_checks( assets=[min_order, AssetKey(["RAW_DATA", "orders"]), - AssetKey(["RAW_DATA", "users"])], + AssetKey(["RAW_DATA", "users"]) + ], lower_bound_delta=datetime.timedelta( hours=24 ), # expect new data at least once a day @@ -99,7 +100,7 @@ def key_product_deepdive(context, sku_stats): ) min_order_freshness_check_sensor = build_sensor_for_freshness_checks( - freshness_checks=[min_order_freshness_check], + freshness_checks=min_order_freshness_check, minimum_interval_seconds=10*60 ) diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index b736634..36d28c0 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -64,8 +64,8 @@ executor=multiprocess_executor.configured( {"max_concurrent": 3} ), - assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets], - asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, avg_orders_freshness_check, min_order_freshness_check], + assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets], # + asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check], resources=resource_def[get_env()], schedules=[analytics_schedule, avg_orders_freshness_check_schedule], sensors=[