Skip to content

Commit

Permalink
Add asset checks and project cleanup (#41)
Browse files Browse the repository at this point in the history
* ignore local duckdb and dbt logs

* moved sensor related files to the sensors directory

* moved analytics_job and predict_job out of definitions.py

* moved analytics_schedule to this directory for better organization

* added check_avg_orders

* added check_users asset check

* added check_country_stats asset check

* moved jobs, resources, schedules, and sensor logic to their respective directories

* moved resources from definitions.py to resources directory
  • Loading branch information
izzye84 authored Sep 27, 2023
1 parent dd52575 commit e4b252f
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 186 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,8 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

tmp*/
tmp*/

# dbt
dbt_project/example.duckdb
dbt_project/logs
13 changes: 12 additions & 1 deletion hooli_basics/definitions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import asset
from dagster import asset, asset_check, AssetCheckResult, Definitions
from pandas import DataFrame, read_html, get_dummies, to_numeric
from sklearn.linear_model import LinearRegression as Regression

Expand All @@ -9,6 +9,12 @@ def country_stats() -> DataFrame:
df["pop_change"] = ((to_numeric(df["pop_2023"]) / to_numeric(df["pop_2022"])) - 1)*100
return df

@asset_check(
asset=country_stats
)
def check_country_stats(country_stats):
return AssetCheckResult(success=True)

@asset
def change_model(country_stats: DataFrame) -> Regression:
data = country_stats.dropna(subset=["pop_change"])
Expand All @@ -21,3 +27,8 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF
result["pop_change_factor"] = change_model.coef_
return result

defs = Definitions(
assets=[country_stats, continent_stats, change_model],
asset_checks=[check_country_stats]
)

24 changes: 22 additions & 2 deletions hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
from dagster import asset, FreshnessPolicy, AssetIn, DynamicPartitionsDefinition, MetadataValue, AutoMaterializePolicy
from dagster import (
asset,
FreshnessPolicy,
AssetIn,
DynamicPartitionsDefinition,
MetadataValue,
AutoMaterializePolicy,
AssetExecutionContext,
AssetCheckResult,
asset_check
)
import pandas as pd

# These assets take data from a SQL table managed by
Expand All @@ -13,13 +23,23 @@
op_tags={"owner": "[email protected]"},
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}
)
def avg_orders(company_perf: pd.DataFrame) -> pd.DataFrame:
def avg_orders(context: AssetExecutionContext, company_perf: pd.DataFrame) -> pd.DataFrame:
""" Computes avg order KPI, must be updated regularly for exec dashboard """

return pd.DataFrame({
"avg_order": company_perf['total_revenue'] / company_perf['n_orders']
})

@asset_check(
description="check that avg orders are expected",
asset=avg_orders
)
def check_avg_orders(context, avg_orders: pd.DataFrame):
avg = avg_orders['avg_order'][0]
return AssetCheckResult(
success= True if (avg < 50) else False,
metadata={"actual average": avg, "threshold": 50}
)

@asset(
key_prefix="MARKETING",
Expand Down
29 changes: 27 additions & 2 deletions hooli_data_eng/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
from datetime import timedelta

from dagster import (
asset,
asset_check,
AssetCheckSeverity,
AssetCheckResult,
AssetKey,
Backoff,
DailyPartitionsDefinition,
Jitter,
RetryPolicy,
)
import pandas as pd
from dagster import asset, RetryPolicy, Backoff, Jitter, DailyPartitionsDefinition, OpExecutionContext, build_op_context, build_resources
from datetime import datetime, timedelta


from hooli_data_eng.resources.api import RawDataAPI


Expand Down Expand Up @@ -37,6 +50,18 @@ def users(context, api: RawDataAPI) -> pd.DataFrame:

return pd.concat(all_users)

@asset_check(
asset=AssetKey(["RAW_DATA", "users"]),
description="check that users are from expected companies",
#severity=AssetCheckSeverity.WARN,
)
def check_users(context, users: pd.DataFrame):
unique_companies = pd.unique(users['company']).tolist()
return AssetCheckResult(
success= (unique_companies == ["FoodCo", "ShopMart", "SportTime", "FamilyLtd"]),
metadata={"companies": unique_companies},
severity=AssetCheckSeverity.WARN
)

@asset(
compute_kind="api",
Expand Down
190 changes: 10 additions & 180 deletions hooli_data_eng/definitions.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,20 @@
import os

from dagster_pyspark import pyspark_resource

from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets
from hooli_data_eng.assets.delayed_asset_alerts import asset_delay_alert_sensor
from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem
from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert
from hooli_data_eng.resources.databricks import db_step_launcher
from hooli_data_eng.resources.api import RawDataAPI
from hooli_data_eng.jobs.watch_s3 import watch_s3_sensor
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_dbt import DbtCliClientResource
from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli
from dagster_snowflake_pandas import SnowflakePandasIOManager
#from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager
from dagster_aws.s3 import ConfigurablePickledObjectS3IOManager, S3Resource
from dagstermill import ConfigurableLocalOutputNotebookIOManager


from dagster import (
build_schedule_from_partitioned_job,
AssetSelection,
Definitions,
EnvVar,
EventLogEntry,
RunRequest,
SensorEvaluationContext,
ResourceDefinition,
asset_sensor,
define_asset_job,
FilesystemIOManager,
load_assets_from_modules,
load_assets_from_package_module,
AssetKey,
AutoMaterializePolicy,
multiprocess_executor,
)

from dagster._utils import file_relative_path
from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets
from hooli_data_eng.assets.marketing import check_avg_orders
from hooli_data_eng.assets.raw_data import check_users
from hooli_data_eng.jobs import analytics_job, predict_job
from hooli_data_eng.resources import get_env, resource_def
from hooli_data_eng.schedules import analytics_schedule
from hooli_data_eng.sensors import orders_sensor
from hooli_data_eng.sensors.delayed_asset_alerts import asset_delay_alert_sensor
from hooli_data_eng.sensors.watch_s3 import watch_s3_sensor

# ---------------------------------------------------
# Assets
Expand All @@ -62,10 +39,6 @@
# specifies what databases to targets, and locally will
# execute against a DuckDB


DBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project")
DBT_PROFILES_DIR = file_relative_path(__file__, "../dbt_project/config")

dbt_assets = load_assets_from_modules([dbt_assets])

# Our final set of assets represent Python code that
Expand All @@ -77,150 +50,6 @@

marketing_assets = load_assets_from_package_module(marketing, group_name="MARKETING")

# ---------------------------------------------------
# Resources

# Resources represent external systems and, and specifically IO Managers
# tell dagster where our assets should be materialized. In dagster
# resources are separate from logical code to make it possible
# to develop locally, run tests, and run integration tests
#
# This project is designed for everything to run locally
# using the file system and DuckDB as the primary development resources
#
# PRs use a "branch" environment that mirrors production with
# staging Snowflake and S3 resources
#
# The production deployment on Dagster Cloud uses production Snowflake
# and S3 resources


def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"


# Similar to having different dbt targets, here we create the resource
# configuration by environment

resource_def = {
"LOCAL": {
"io_manager": DuckDBPandasIOManager(
database=os.path.join(DBT_PROJECT_DIR, "example.duckdb")
),
"model_io_manager": FilesystemIOManager(),
"output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
"api": RawDataAPI.configure_at_launch(),
"s3": ResourceDefinition.none_resource(),
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"
),
"dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"),
"pyspark": pyspark_resource,
"step_launcher": ResourceDefinition.none_resource(),
"monitor_fs": LocalFileSystem(base_dir=file_relative_path(__file__, ".")),
"email": LocalEmailAlert(
smtp_email_to=["[email protected]"], smtp_email_from="[email protected]"
),
},
"BRANCH": {
"io_manager": SnowflakePandasIOManager(
database="DEMO_DB2_BRANCH",
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
warehouse="TINY_WAREHOUSE",
),
"model_io_manager": ConfigurablePickledObjectS3IOManager(
s3_bucket="hooli-demo-branch",
s3_resource=S3Resource(region_name="us-west-2"),
),
"output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
"api": RawDataAPI.configure_at_launch(),
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"
),
"dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"),
"pyspark": pyspark_resource,
"step_launcher": db_step_launcher,
"monitor_fs": s3FileSystem(
region_name="us-west-2", s3_bucket="hooli-demo-branch"
),
"email": ResourceDefinition.none_resource(),
},
"PROD": {
"io_manager": SnowflakePandasIOManager(
database="DEMO_DB2",
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
warehouse="TINY_WAREHOUSE",
),
"model_io_manager": ConfigurablePickledObjectS3IOManager(
s3_bucket="hooli-demo-branch",
s3_resource=S3Resource(region_name="us-west-2"),
),
"output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
"api": RawDataAPI.configure_at_launch(),
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"
),
"dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"),
"pyspark": pyspark_resource,
"step_launcher": db_step_launcher,
"monitor_fs": s3FileSystem(region_name="us-west-2", s3_bucket="hooli-demo"),
"email": SESEmailAlert(
smtp_host="email-smtp.us-west-2.amazonaws.com",
smtp_email_from="[email protected]",
smtp_email_to=["[email protected]"],
smtp_username=EnvVar("SMTP_USERNAME"),
smtp_password=EnvVar("SMTP_PASSWORD"),
),
},
}

# ---------------------------------------------------
# Jobs and Sensors

# With assets defined we have everything to run Dagster
# ourselves if we wanted to manually create assets.
# Most of the time you will want to automate asset creation.
# In dagster, jobs allow you to update all or some assets.
# Jobs can be run on a schedule, or in response to an external
# event using a sensor.

# This job updates all of the assets upstream of "orders_augmented",
# which is an asset representing a model in dbt
analytics_job = define_asset_job(
name="refresh_analytics_model_job",
selection=AssetSelection.keys(["ANALYTICS", "orders_augmented"]).upstream(),
tags={"dagster/max_retries": "1"},
# config = {"execution": {"config": {"multiprocess": {"max_concurrent": 1}}}}
)

# This schedule tells dagster to run the analytics_job daily
analytics_schedule = build_schedule_from_partitioned_job(analytics_job)

# This job selects the predicted_orders asset defined in
# assets/forecasting/__init__.py
predict_job = define_asset_job(
"predict_job",
selection=AssetSelection.keys(["FORECASTING", "predicted_orders"]),
tags={"alert_team": "ml"},
)


# This sensor listens for changes to the orders_augmented asset which
# represents a dbt model. When the table managed by dbt is updated,
# this sensor will trigger the predict_job above, ensuring that anytime
# new order data is produced the forecast is updated
@asset_sensor(asset_key=AssetKey(["ANALYTICS", "orders_augmented"]), job=predict_job)
def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
yield RunRequest(run_key=context.cursor)

# ---------------------------------------------------
# Definitions

Expand All @@ -232,6 +61,7 @@ def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
{"max_concurrent": 3}
),
assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets],
asset_checks=[check_users, check_avg_orders],
resources=resource_def[get_env()],
schedules=[analytics_schedule],
sensors=[
Expand Down
25 changes: 25 additions & 0 deletions hooli_data_eng/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dagster import AssetSelection, define_asset_job

# With assets defined we have everything to run Dagster
# ourselves if we wanted to manually create assets.
# Most of the time you will want to automate asset creation.
# In dagster, jobs allow you to update all or some assets.
# Jobs can be run on a schedule, or in response to an external
# event using a sensor.

# This job updates all of the assets upstream of "orders_augmented",
# which is an asset representing a model in dbt
analytics_job = define_asset_job(
name="refresh_analytics_model_job",
selection=AssetSelection.keys(["ANALYTICS", "orders_augmented"]).upstream(),
tags={"dagster/max_retries": "1"},
# config = {"execution": {"config": {"multiprocess": {"max_concurrent": 1}}}}
)

# This job selects the predicted_orders asset defined in
# assets/forecasting/__init__.py
predict_job = define_asset_job(
"predict_job",
selection=AssetSelection.keys(["FORECASTING", "predicted_orders"]),
tags={"alert_team": "ml"},
)
Loading

0 comments on commit e4b252f

Please sign in to comment.