Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lopp add checks #39

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion hooli_basics/definitions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import asset
from dagster import asset, asset_check, AssetCheckResult, Definitions
from pandas import DataFrame, read_html, get_dummies, to_numeric
from sklearn.linear_model import LinearRegression as Regression

Expand All @@ -9,15 +9,28 @@ def country_stats() -> DataFrame:
df["pop_change"] = ((to_numeric(df["pop_2023"]) / to_numeric(df["pop_2022"])) - 1)*100
return df

@asset_check(
asset=country_stats
)
def check_country_stats(country_stats):
return AssetCheckResult(success=True)

@asset
def change_model(country_stats: DataFrame) -> Regression:
data = country_stats.dropna(subset=["pop_change"])
dummies = get_dummies(data[["continent"]])
return Regression().fit(dummies, data["pop_change"])



@asset
def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame:
result = country_stats.groupby("continent").sum()
result["pop_change_factor"] = change_model.coef_
return result


defs = Definitions(
assets=[country_stats, continent_stats, change_model],
asset_checks=[check_country_stats]
)
1 change: 1 addition & 0 deletions hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any, Mapping
from dagster._utils import file_relative_path
from dagster_dbt import DbtCliResource, DagsterDbtTranslator
from dagster_dbt import DbtCliResource, DagsterDbtTranslator
from dagster_dbt import load_assets_from_dbt_project
from dagster_dbt.asset_decorator import dbt_assets
from dagster import AssetKey, DailyPartitionsDefinition, WeeklyPartitionsDefinition, OpExecutionContext, Output, MetadataValue
Expand Down
15 changes: 13 additions & 2 deletions hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import asset, FreshnessPolicy, AssetIn, DynamicPartitionsDefinition, MetadataValue, AutoMaterializePolicy
from dagster import asset, FreshnessPolicy, AssetIn, DynamicPartitionsDefinition, MetadataValue, AutoMaterializePolicy, AssetExecutionContext, AssetCheckResult, AssetCheckSeverity, asset_check
import pandas as pd

# These assets take data from a SQL table managed by
Expand All @@ -13,13 +13,24 @@
op_tags={"owner": "[email protected]"},
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}
)
def avg_orders(company_perf: pd.DataFrame) -> pd.DataFrame:
def avg_orders(context: AssetExecutionContext, company_perf: pd.DataFrame) -> pd.DataFrame:
""" Computes avg order KPI, must be updated regularly for exec dashboard """


return pd.DataFrame({
"avg_order": company_perf['total_revenue'] / company_perf['n_orders']
})

@asset_check(
description="check that avg orders are expected",
asset=avg_orders
)
def check_avg_orders(context, avg_orders: pd.DataFrame):
avg = avg_orders['avg_order'][0]
return AssetCheckResult(
success= True if (avg < 50) else False,
metadata={"actual average": avg, "threshold": 50}
)

@asset(
key_prefix="MARKETING",
Expand Down
14 changes: 13 additions & 1 deletion hooli_data_eng/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pandas as pd
from dagster import asset, RetryPolicy, Backoff, Jitter, DailyPartitionsDefinition, OpExecutionContext, build_op_context, build_resources
from dagster import asset, RetryPolicy, Backoff, Jitter, DailyPartitionsDefinition, OpExecutionContext, build_op_context, build_resources, asset_check, AssetCheckSeverity, AssetCheckResult, AssetKey
from datetime import datetime, timedelta
from hooli_data_eng.resources.api import RawDataAPI

Expand Down Expand Up @@ -37,6 +37,18 @@ def users(context, api: RawDataAPI) -> pd.DataFrame:

return pd.concat(all_users)

@asset_check(
asset=AssetKey(["RAW_DATA", "users"]),
description="check that users are from expected companies",
#severity=AssetCheckSeverity.WARN,
)
def check_users(context, users: pd.DataFrame):
unique_companies = pd.unique(users['company']).tolist()
return AssetCheckResult(
success= (unique_companies == ["FoodCo", "ShopMart", "SportTime", "FamilyLtd"]),
metadata={"companies": unique_companies},
severity=AssetCheckSeverity.WARN
)

@asset(
compute_kind="api",
Expand Down
3 changes: 3 additions & 0 deletions hooli_data_eng/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from dagster_pyspark import pyspark_resource

from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets
from hooli_data_eng.assets.raw_data import check_users
from hooli_data_eng.assets.marketing import check_avg_orders
from hooli_data_eng.assets.delayed_asset_alerts import asset_delay_alert_sensor
from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem
from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert
Expand Down Expand Up @@ -232,6 +234,7 @@ def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
{"max_concurrent": 3}
),
assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets],
asset_checks=[check_users, check_avg_orders],
resources=resource_def[get_env()],
schedules=[analytics_schedule],
sensors=[
Expand Down
Loading