From 3cc01871902d2a9439f5001edc16a98f6f9abb70 Mon Sep 17 00:00:00 2001 From: Sean Lopp Date: Tue, 1 Aug 2023 07:21:35 -0600 Subject: [PATCH 1/7] wip --- hooli_data_eng/assets/dbt_assets.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hooli_data_eng/assets/dbt_assets.py b/hooli_data_eng/assets/dbt_assets.py index 58fcc4d..94bf4b3 100644 --- a/hooli_data_eng/assets/dbt_assets.py +++ b/hooli_data_eng/assets/dbt_assets.py @@ -2,6 +2,7 @@ from typing import Any, Mapping from dagster._utils import file_relative_path from dagster_dbt import DbtCliResource, DagsterDbtTranslator +from dagster_dbt import DbtCliResource, DagsterDbtTranslator from dagster_dbt import load_assets_from_dbt_project from dagster_dbt.asset_decorator import dbt_assets from dagster import AssetKey, DailyPartitionsDefinition, WeeklyPartitionsDefinition, OpExecutionContext, Output, MetadataValue From 92e1adee6ba00538d5adc4d38954eb38fb4f8c1a Mon Sep 17 00:00:00 2001 From: Sean Lopp Date: Fri, 1 Sep 2023 11:09:31 -0600 Subject: [PATCH 2/7] first pass at adding checks --- hooli_data_eng/assets/marketing/__init__.py | 15 +++++++++++++-- hooli_data_eng/assets/raw_data/__init__.py | 12 +++++++++++- hooli_data_eng/definitions.py | 3 +++ 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index f907d1b..b7dc894 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -1,4 +1,4 @@ -from dagster import asset, FreshnessPolicy, AssetIn, DynamicPartitionsDefinition, MetadataValue, AutoMaterializePolicy +from dagster import asset, FreshnessPolicy, AssetIn, DynamicPartitionsDefinition, MetadataValue, AutoMaterializePolicy, AssetExecutionContext, AssetCheckResult, AssetCheckSeverity, asset_check import pandas as pd # These assets take data from a SQL table managed by @@ -13,13 +13,24 @@ op_tags={"owner": "bi@hooli.com"}, ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])} ) -def avg_orders(company_perf: pd.DataFrame) -> pd.DataFrame: +def avg_orders(context: AssetExecutionContext, company_perf: pd.DataFrame) -> pd.DataFrame: """ Computes avg order KPI, must be updated regularly for exec dashboard """ + return pd.DataFrame({ "avg_order": company_perf['total_revenue'] / company_perf['n_orders'] }) +@asset_check( + description="check that avg ordres are expected", + severity=AssetCheckSeverity.WARN, +) +def check_avg_orders(context, avg_orders: pd.DataFrame): + avg = avg_orders['avg_order'][0] + return AssetCheckResult( + success= (avg < 50), + metadata={"actual average": avg, "threshold": 50} + ) @asset( key_prefix="MARKETING", diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index a7632b9..cfd989f 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -1,5 +1,5 @@ import pandas as pd -from dagster import asset, RetryPolicy, Backoff, Jitter, DailyPartitionsDefinition, OpExecutionContext, build_op_context, build_resources +from dagster import asset, RetryPolicy, Backoff, Jitter, DailyPartitionsDefinition, OpExecutionContext, build_op_context, build_resources, asset_check, AssetCheckSeverity, AssetCheckResult from datetime import datetime, timedelta from hooli_data_eng.resources.api import RawDataAPI @@ -37,6 +37,16 @@ def users(context, api: RawDataAPI) -> pd.DataFrame: return pd.concat(all_users) +@asset_check( + description="check that users are from expected companies", + severity=AssetCheckSeverity.WARN, +) +def check_users(context, users: pd.DataFrame): + unique_companies = pd.unique(users['company']).tolist() + return AssetCheckResult( + success= (unique_companies == ["FoodCo", "ShopMart", "SportTime", "FamilyLtd"]), + metadata={"companies": unique_companies} + ) @asset( compute_kind="api", diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index a5582f1..109458e 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -3,6 +3,8 @@ from dagster_pyspark import pyspark_resource from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets +from hooli_data_eng.assets.raw_data import check_users +from hooli_data_eng.assets.marketing import check_avg_orders from hooli_data_eng.assets.delayed_asset_alerts import asset_delay_alert_sensor from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert @@ -232,6 +234,7 @@ def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry): {"max_concurrent": 3} ), assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets], + asset_checks=[check_users, check_avg_orders], resources=resource_def[get_env()], schedules=[analytics_schedule], sensors=[ From 04cacb3ea73db932e7bda1cb786a966460ef884b Mon Sep 17 00:00:00 2001 From: Sean Lopp Date: Fri, 1 Sep 2023 11:40:59 -0600 Subject: [PATCH 3/7] add check to basics --- hooli_basics/definitions.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index 3805f3d..2206f83 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -1,4 +1,4 @@ -from dagster import asset +from dagster import asset, asset_check, AssetCheckResult, Definitions from pandas import DataFrame, read_html, get_dummies, to_numeric from sklearn.linear_model import LinearRegression as Regression @@ -9,15 +9,28 @@ def country_stats() -> DataFrame: df["pop_change"] = ((to_numeric(df["pop_2023"]) / to_numeric(df["pop_2022"])) - 1)*100 return df +@asset_check( + +) +def check_country_stats(country_stats): + return AssetCheckResult(success=True) + @asset def change_model(country_stats: DataFrame) -> Regression: data = country_stats.dropna(subset=["pop_change"]) dummies = get_dummies(data[["continent"]]) return Regression().fit(dummies, data["pop_change"]) + + @asset def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame: result = country_stats.groupby("continent").sum() result["pop_change_factor"] = change_model.coef_ return result + +defs = Definitions( + assets=[country_stats, continent_stats, change_model], + asset_checks=[check_country_stats] +) \ No newline at end of file From 2985d24e52a14cc548cd4d081d53bfb65ffc14d3 Mon Sep 17 00:00:00 2001 From: Sean Lopp Date: Thu, 14 Sep 2023 14:50:18 -0600 Subject: [PATCH 4/7] updated checks with latest API --- hooli_data_eng/assets/marketing/__init__.py | 4 ++-- hooli_data_eng/assets/raw_data/__init__.py | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index b7dc894..8c54c9a 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -22,8 +22,8 @@ def avg_orders(context: AssetExecutionContext, company_perf: pd.DataFrame) -> pd }) @asset_check( - description="check that avg ordres are expected", - severity=AssetCheckSeverity.WARN, + description="check that avg orders are expected", + asset=avg_orders ) def check_avg_orders(context, avg_orders: pd.DataFrame): avg = avg_orders['avg_order'][0] diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index cfd989f..ace4c34 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -1,5 +1,5 @@ import pandas as pd -from dagster import asset, RetryPolicy, Backoff, Jitter, DailyPartitionsDefinition, OpExecutionContext, build_op_context, build_resources, asset_check, AssetCheckSeverity, AssetCheckResult +from dagster import asset, RetryPolicy, Backoff, Jitter, DailyPartitionsDefinition, OpExecutionContext, build_op_context, build_resources, asset_check, AssetCheckSeverity, AssetCheckResult, AssetKey from datetime import datetime, timedelta from hooli_data_eng.resources.api import RawDataAPI @@ -38,14 +38,16 @@ def users(context, api: RawDataAPI) -> pd.DataFrame: return pd.concat(all_users) @asset_check( + asset=AssetKey(["RAW_DATA", "users"]), description="check that users are from expected companies", - severity=AssetCheckSeverity.WARN, + #severity=AssetCheckSeverity.WARN, ) def check_users(context, users: pd.DataFrame): unique_companies = pd.unique(users['company']).tolist() return AssetCheckResult( success= (unique_companies == ["FoodCo", "ShopMart", "SportTime", "FamilyLtd"]), - metadata={"companies": unique_companies} + metadata={"companies": unique_companies}, + severity=AssetCheckSeverity.WARN ) @asset( From 954dd8ce56a9b21bcfde7ccb478ffcafcfea2f4e Mon Sep 17 00:00:00 2001 From: Sean Lopp Date: Thu, 14 Sep 2023 16:23:34 -0600 Subject: [PATCH 5/7] fix basics and avg_order numpy boolean issue --- hooli_basics/definitions.py | 2 +- hooli_data_eng/assets/marketing/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index 2206f83..fb68f7a 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -10,7 +10,7 @@ def country_stats() -> DataFrame: return df @asset_check( - + asset=country_stats ) def check_country_stats(country_stats): return AssetCheckResult(success=True) diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index 8c54c9a..55380fe 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -28,7 +28,7 @@ def avg_orders(context: AssetExecutionContext, company_perf: pd.DataFrame) -> pd def check_avg_orders(context, avg_orders: pd.DataFrame): avg = avg_orders['avg_order'][0] return AssetCheckResult( - success= (avg < 50), + success= True if (avg < 50) else False, metadata={"actual average": avg, "threshold": 50} ) From 871c5712837218a7ada59aecaee91b7a9c42089e Mon Sep 17 00:00:00 2001 From: izzy <60406698+izzye84@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:46:15 -0600 Subject: [PATCH 6/7] Update setup.py beautifulsoup4 required for the country_stats asset --- setup.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 6d0067f..1eb1ac5 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,8 @@ "responses", "requests", "html5lib", - "scikit-learn" + "scikit-learn", + "beautifulsoup4" ], extras_require={"dev": ["dagit", "pytest"]}, - ) \ No newline at end of file + ) From 086bb5b4085a5f39ddfc36493de1b3ba4fea332b Mon Sep 17 00:00:00 2001 From: izzy Date: Fri, 15 Sep 2023 14:18:25 -0600 Subject: [PATCH 7/7] Revert "Update setup.py" This reverts commit 871c5712837218a7ada59aecaee91b7a9c42089e. --- setup.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 1eb1ac5..6d0067f 100644 --- a/setup.py +++ b/setup.py @@ -27,8 +27,7 @@ "responses", "requests", "html5lib", - "scikit-learn", - "beautifulsoup4" + "scikit-learn" ], extras_require={"dev": ["dagit", "pytest"]}, - ) + ) \ No newline at end of file