Skip to content

Commit

Permalink
Merge pull request #89 from dagster-io/add-schema-checks
Browse files Browse the repository at this point in the history
Add schema checks
  • Loading branch information
cnolanminich committed May 14, 2024
2 parents 21f16cc + b886fe6 commit 61abddf
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
if: steps.prerun.outputs.result != 'skip'
run: |
pip install pip --upgrade;
pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb dbt-snowflake --upgrade --upgrade-strategy eager;
pip install dagster-dbt dagster-cloud dbt-core~=1.7.0 dbt-duckdb~=1.7.5 dbt-snowflake~=1.7.3 --upgrade --upgrade-strategy eager;
make deps
dagster-dbt project prepare-for-deployment --file hooli_data_eng/project.py
dagster-cloud ci dagster-dbt project manage-state --file hooli_data_eng/project.py --source-deployment data-eng-prod
Expand Down
6 changes: 3 additions & 3 deletions dbt_project/package-lock.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ packages:
- package: calogica/dbt_expectations
version: 0.10.3
- git: https://github.com/dagster-io/dagster.git
revision: 0ffe859627ccb83c7fb4d9eae17ef55f5fdd6bef
revision: b75059b249d1388f99bd49b1b46f9c99686a3a6f
subdirectory: python_modules/libraries/dagster-dbt/dbt_packages/dagster
- package: calogica/dbt_date
version: 0.10.0
sha1_hash: d4d1f8c5fedc2d6f6b2284a96b38abefe890c658
version: 0.10.1
sha1_hash: 0da1ef521792fc305eb81faabcffbc88ce85ec0e
2 changes: 1 addition & 1 deletion dbt_project/packages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ packages:
version: 0.10.3
- git: "https://github.com/dagster-io/dagster.git"
subdirectory: "python_modules/libraries/dagster-dbt/dbt_packages/dagster"
revision: "1.7.1" # replace with the version of `dagster` you are using.
revision: "1.7.5" # replace with the version of `dagster` you are using.
8 changes: 6 additions & 2 deletions hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
AssetExecutionContext,
AssetCheckResult,
asset_check,
AssetKey,
FreshnessPolicy,
define_asset_job,
ScheduleDefinition,
Expand Down Expand Up @@ -84,7 +85,10 @@ def key_product_deepdive(context, sku_stats):


min_order_freshness_check = build_last_update_freshness_checks(
assets=[min_order],
assets=[min_order,
AssetKey(["RAW_DATA", "orders"]),
AssetKey(["RAW_DATA", "users"])
],
lower_bound_delta=datetime.timedelta(
hours=24
), # expect new data at least once a day
Expand All @@ -96,7 +100,7 @@ def key_product_deepdive(context, sku_stats):
)

min_order_freshness_check_sensor = build_sensor_for_freshness_checks(
freshness_checks=[min_order_freshness_check],
freshness_checks=min_order_freshness_check,
minimum_interval_seconds=10*60
)

Expand Down
6 changes: 6 additions & 0 deletions hooli_data_eng/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
AssetCheckSeverity,
AssetCheckResult,
AssetKey,
build_column_schema_change_checks,
BackfillPolicy,
Backoff,
DailyPartitionsDefinition,
Expand Down Expand Up @@ -99,5 +100,10 @@ def orders(context, api: RawDataAPI) -> pd.DataFrame:
all_orders_df['dt'] = pd.to_datetime(all_orders_df['dt'], unit = "ms")
return all_orders_df

raw_data_schema_checks = build_column_schema_change_checks(assets=[
AssetKey(["RAW_DATA", "orders"]),
AssetKey(["RAW_DATA", "users"]),
])


from dagster_dbt import dbt_assets
9 changes: 6 additions & 3 deletions hooli_data_eng/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
Definitions,
load_assets_from_modules,
load_assets_from_package_module,
build_column_schema_change_checks,
multiprocess_executor,
)

from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets
from hooli_data_eng.assets.dbt_assets import dbt_slim_ci_job
from hooli_data_eng.assets.marketing import check_avg_orders
from hooli_data_eng.assets.raw_data import check_users
from hooli_data_eng.assets.raw_data import check_users, raw_data_schema_checks
from hooli_data_eng.jobs import analytics_job, predict_job
from hooli_data_eng.resources import get_env, resource_def
from hooli_data_eng.schedules import analytics_schedule
Expand Down Expand Up @@ -42,6 +43,8 @@

dbt_assets = load_assets_from_modules([dbt_assets])

dbt_asset_checks = build_column_schema_change_checks(assets=[*dbt_assets])

# Our final set of assets represent Python code that
# should run after dbt. These assets are defined in
# assets/forecasting/__init__.py
Expand All @@ -61,8 +64,8 @@
executor=multiprocess_executor.configured(
{"max_concurrent": 3}
),
assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets],
asset_checks=[check_users, check_avg_orders, avg_orders_freshness_check, min_order_freshness_check],
assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets], #
asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check],
resources=resource_def[get_env()],
schedules=[analytics_schedule, avg_orders_freshness_check_schedule],
sensors=[
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
"pandas",
"numpy",
"scipy",
"dbt-core",
"dbt-duckdb",
"dbt-snowflake",
"dbt-core~=1.7.0",
"dbt-duckdb~=1.7.5",
"dbt-snowflake~=1.7.0",
"dagster-duckdb",
"dagster-aws",
"dagster-duckdb-pandas",
Expand Down

0 comments on commit 61abddf

Please sign in to comment.