From 691f8bb49371bb6489ccaf59a20c9141c223b5cf Mon Sep 17 00:00:00 2001 From: izzy Date: Mon, 1 Jul 2024 10:40:32 -0600 Subject: [PATCH 01/15] WIP --- .../hooli_demo_assets/definitions.py | 3 ++- hooli_basics/definitions.py | 3 ++- .../dagster_batch_enrichment/definitions.py | 3 ++- hooli_data_eng/assets/dbt_assets.py | 16 ++++++++++++---- hooli_data_eng/definitions.py | 6 +++++- hooli_snowflake_insights/definitions.py | 3 ++- 6 files changed, 25 insertions(+), 9 deletions(-) diff --git a/hooli-demo-assets/hooli_demo_assets/definitions.py b/hooli-demo-assets/hooli_demo_assets/definitions.py index 3c6aa84c..bdb27cdd 100644 --- a/hooli-demo-assets/hooli_demo_assets/definitions.py +++ b/hooli-demo-assets/hooli_demo_assets/definitions.py @@ -1,6 +1,7 @@ from dagster import ( Definitions, ) +from dagster._core.definitions.metadata import with_source_code_references from hooli_demo_assets.assets.sling import my_sling_assets from hooli_demo_assets.jobs import daily_sling_job @@ -9,7 +10,7 @@ defs = Definitions( - assets=[my_sling_assets], + assets=with_source_code_references([my_sling_assets]), schedules=[daily_sling_assets], jobs=[daily_sling_job], resources={ diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index d5699e24..63dd017b 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -1,4 +1,5 @@ from dagster import asset, asset_check, AssetCheckResult, Definitions +from dagster._core.definitions.metadata import with_source_code_references from pandas import DataFrame, read_html, get_dummies, to_numeric from sklearn.linear_model import LinearRegression as Regression @@ -28,7 +29,7 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF return result defs = Definitions( - assets=[country_stats, continent_stats, change_model], + assets=with_source_code_references([country_stats, continent_stats, change_model]), asset_checks=[check_country_stats] ) diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py index b2a57d25..2f919a8f 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py @@ -1,4 +1,5 @@ from dagster import Definitions, define_asset_job, ScheduleDefinition, AssetSelection +from dagster._core.definitions.metadata import with_source_code_references from dagster_batch_enrichment.api import EnrichmentAPI from dagster_batch_enrichment.warehouse import MyWarehouse from dagster_batch_enrichment.assets import raw_data, enriched_data @@ -19,7 +20,7 @@ ) defs = Definitions( - assets=[raw_data, enriched_data], + assets=with_source_code_references([raw_data, enriched_data]), schedules=[run_assets_30min], jobs=[run_assets_job], resources={ diff --git a/hooli_data_eng/assets/dbt_assets.py b/hooli_data_eng/assets/dbt_assets.py index d993d4b5..4674f5f9 100644 --- a/hooli_data_eng/assets/dbt_assets.py +++ b/hooli_data_eng/assets/dbt_assets.py @@ -133,10 +133,12 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliReso @dbt_assets( manifest=DBT_MANIFEST, + project=dbt_project, select="orders_cleaned users_cleaned orders_augmented", partitions_def=daily_partitions, dagster_dbt_translator=CustomDagsterDbtTranslator( - settings=DagsterDbtTranslatorSettings(enable_asset_checks=True) + settings=DagsterDbtTranslatorSettings(enable_asset_checks=True, + enable_code_references=True,) ), backfill_policy=BackfillPolicy.single_run(), ) @@ -146,10 +148,12 @@ def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): @dbt_assets( manifest=DBT_MANIFEST, + project=dbt_project, select="weekly_order_summary order_stats", partitions_def=weekly_partitions, dagster_dbt_translator=CustomDagsterDbtTranslator( - DagsterDbtTranslatorSettings(enable_asset_checks=True) + DagsterDbtTranslatorSettings(enable_asset_checks=True, + enable_code_references=True,) ), backfill_policy=BackfillPolicy.single_run(), ) @@ -164,9 +168,11 @@ def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): @dbt_assets( manifest=DBT_MANIFEST, + project=dbt_project, select="company_perf sku_stats company_stats locations_cleaned", dagster_dbt_translator=CustomDagsterDbtTranslatorForViews( - DagsterDbtTranslatorSettings(enable_asset_checks=True) + DagsterDbtTranslatorSettings(enable_asset_checks=True, + enable_code_references=True,) ), ) def views_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): @@ -203,8 +209,10 @@ def dbt_slim_ci(dbt2: DbtCliResource): yield from dbt2.cli( args=dbt_command, manifest=DBT_MANIFEST, + project=dbt_project, dagster_dbt_translator=CustomDagsterDbtTranslator( - DagsterDbtTranslatorSettings(enable_asset_checks=True) + DagsterDbtTranslatorSettings(enable_asset_checks=True, + enable_code_references=True,) ), ).stream().fetch_row_counts().fetch_column_metadata() diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 34aa6bef..2c3b6007 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -6,6 +6,8 @@ build_column_schema_change_checks, multiprocess_executor, ) +from dagster._core.definitions.metadata import with_source_code_references +from dagster_cloud.metadata.source_code import link_to_git_if_cloud from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets from hooli_data_eng.assets.dbt_assets import dbt_slim_ci_job @@ -65,7 +67,9 @@ executor=multiprocess_executor.configured( {"max_concurrent": 3} ), - assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets], # + assets=link_to_git_if_cloud( ###TODO add asset_defs per docs—ask Ben) + with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets]) + ), asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check, *weekly_freshness_check], resources=resource_def[get_env()], schedules=[analytics_schedule, avg_orders_freshness_check_schedule], diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index c5412ec6..585b77c6 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -1,6 +1,7 @@ import os from dagster import Definitions, EnvVar, ResourceDefinition +from dagster._core.definitions.metadata import with_source_code_references from dagster_cloud.dagster_insights import ( create_snowflake_insights_asset_and_schedule, ) @@ -43,7 +44,7 @@ def get_env(): ) defs = Definitions( - assets=[*snowflake_insights_definitions.assets,], + assets=with_source_code_references([*snowflake_insights_definitions.assets,]), schedules=[snowflake_insights_definitions.schedule,], resources=resource_def[get_env()], ) From 0fd9c885aeef6f85d2bc595f9125ade0e69146d4 Mon Sep 17 00:00:00 2001 From: izzy Date: Mon, 1 Jul 2024 15:14:24 -0600 Subject: [PATCH 02/15] add link_to_git_if_cloud --- hooli-demo-assets/hooli_demo_assets/definitions.py | 5 ++++- hooli_basics/definitions.py | 5 ++++- .../dagster_batch_enrichment/definitions.py | 5 ++++- hooli_data_eng/definitions.py | 2 +- hooli_snowflake_insights/definitions.py | 5 ++++- 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/hooli-demo-assets/hooli_demo_assets/definitions.py b/hooli-demo-assets/hooli_demo_assets/definitions.py index bdb27cdd..f4c0ec05 100644 --- a/hooli-demo-assets/hooli_demo_assets/definitions.py +++ b/hooli-demo-assets/hooli_demo_assets/definitions.py @@ -2,6 +2,7 @@ Definitions, ) from dagster._core.definitions.metadata import with_source_code_references +from dagster_cloud.metadata.source_code import link_to_git_if_cloud from hooli_demo_assets.assets.sling import my_sling_assets from hooli_demo_assets.jobs import daily_sling_job @@ -10,7 +11,9 @@ defs = Definitions( - assets=with_source_code_references([my_sling_assets]), + assets=link_to_git_if_cloud( + with_source_code_references([my_sling_assets]) + ), schedules=[daily_sling_assets], jobs=[daily_sling_job], resources={ diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index 63dd017b..0e67af57 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -1,5 +1,6 @@ from dagster import asset, asset_check, AssetCheckResult, Definitions from dagster._core.definitions.metadata import with_source_code_references +from dagster_cloud.metadata.source_code import link_to_git_if_cloud from pandas import DataFrame, read_html, get_dummies, to_numeric from sklearn.linear_model import LinearRegression as Regression @@ -29,7 +30,9 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF return result defs = Definitions( - assets=with_source_code_references([country_stats, continent_stats, change_model]), + assets=link_to_git_if_cloud( + with_source_code_references([country_stats, continent_stats, change_model]) + ), asset_checks=[check_country_stats] ) diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py index 2f919a8f..ffa15fa9 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py @@ -3,6 +3,7 @@ from dagster_batch_enrichment.api import EnrichmentAPI from dagster_batch_enrichment.warehouse import MyWarehouse from dagster_batch_enrichment.assets import raw_data, enriched_data +from dagster_cloud.metadata.source_code import link_to_git_if_cloud # define a job and schedule to run the pipeline @@ -20,7 +21,9 @@ ) defs = Definitions( - assets=with_source_code_references([raw_data, enriched_data]), + assets=link_to_git_if_cloud( + with_source_code_references([raw_data, enriched_data]) + ), schedules=[run_assets_30min], jobs=[run_assets_job], resources={ diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 2c3b6007..12343a31 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -67,7 +67,7 @@ executor=multiprocess_executor.configured( {"max_concurrent": 3} ), - assets=link_to_git_if_cloud( ###TODO add asset_defs per docs—ask Ben) + assets=link_to_git_if_cloud( with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets]) ), asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check, *weekly_freshness_check], diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index 585b77c6..5becfd75 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -5,6 +5,7 @@ from dagster_cloud.dagster_insights import ( create_snowflake_insights_asset_and_schedule, ) +from dagster_cloud.metadata.source_code import link_to_git_if_cloud from dagster_snowflake import SnowflakeResource # Used to derive environment (LOCAL, BRANCH, PROD) @@ -44,7 +45,9 @@ def get_env(): ) defs = Definitions( - assets=with_source_code_references([*snowflake_insights_definitions.assets,]), + assets=link_to_git_if_cloud( + with_source_code_references([*snowflake_insights_definitions.assets,]) + ), schedules=[snowflake_insights_definitions.schedule,], resources=resource_def[get_env()], ) From 7115b364b10a498836bd26cd1cb5ff9edcc54837 Mon Sep 17 00:00:00 2001 From: izzy Date: Mon, 1 Jul 2024 18:36:03 -0600 Subject: [PATCH 03/15] added repository_root_path parameter --- hooli-demo-assets/hooli_demo_assets/definitions.py | 5 ++++- hooli_basics/definitions.py | 7 +++++-- .../dagster_batch_enrichment/definitions.py | 5 ++++- hooli_data_eng/definitions.py | 4 +++- hooli_snowflake_insights/definitions.py | 4 +++- 5 files changed, 19 insertions(+), 6 deletions(-) diff --git a/hooli-demo-assets/hooli_demo_assets/definitions.py b/hooli-demo-assets/hooli_demo_assets/definitions.py index f4c0ec05..4089ac18 100644 --- a/hooli-demo-assets/hooli_demo_assets/definitions.py +++ b/hooli-demo-assets/hooli_demo_assets/definitions.py @@ -1,3 +1,5 @@ +from pathlib import Path + from dagster import ( Definitions, ) @@ -12,7 +14,8 @@ defs = Definitions( assets=link_to_git_if_cloud( - with_source_code_references([my_sling_assets]) + with_source_code_references([my_sling_assets]), + repository_root_absolute_path=Path(__file__).parent.parent.parent, ), schedules=[daily_sling_assets], jobs=[daily_sling_job], diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index 0e67af57..750206ed 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -1,3 +1,5 @@ +from pathlib import Path + from dagster import asset, asset_check, AssetCheckResult, Definitions from dagster._core.definitions.metadata import with_source_code_references from dagster_cloud.metadata.source_code import link_to_git_if_cloud @@ -31,8 +33,9 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF defs = Definitions( assets=link_to_git_if_cloud( - with_source_code_references([country_stats, continent_stats, change_model]) - ), + with_source_code_references([country_stats, continent_stats, change_model]), + repository_root_absolute_path=Path(__file__).parent.parent, + ), asset_checks=[check_country_stats] ) diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py index ffa15fa9..91d11ed4 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py @@ -1,3 +1,5 @@ +from pathlib import Path + from dagster import Definitions, define_asset_job, ScheduleDefinition, AssetSelection from dagster._core.definitions.metadata import with_source_code_references from dagster_batch_enrichment.api import EnrichmentAPI @@ -22,7 +24,8 @@ defs = Definitions( assets=link_to_git_if_cloud( - with_source_code_references([raw_data, enriched_data]) + with_source_code_references([raw_data, enriched_data]), + repository_root_absolute_path=Path(__file__).parent.parent.parent, ), schedules=[run_assets_30min], jobs=[run_assets_job], diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 12343a31..07aadd10 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -1,3 +1,4 @@ +from pathlib import Path from dagster import ( Definitions, @@ -68,7 +69,8 @@ {"max_concurrent": 3} ), assets=link_to_git_if_cloud( - with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets]) + with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets]), + repository_root_absolute_path=Path(__file__).parent.parent, ), asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check, *weekly_freshness_check], resources=resource_def[get_env()], diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index 5becfd75..9f49dba9 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -1,4 +1,5 @@ import os +from pathlib import Path from dagster import Definitions, EnvVar, ResourceDefinition from dagster._core.definitions.metadata import with_source_code_references @@ -46,7 +47,8 @@ def get_env(): defs = Definitions( assets=link_to_git_if_cloud( - with_source_code_references([*snowflake_insights_definitions.assets,]) + with_source_code_references([*snowflake_insights_definitions.assets,]), + repository_root_absolute_path=Path(__file__).parent.parent, ), schedules=[snowflake_insights_definitions.schedule,], resources=resource_def[get_env()], From feeedb8e93d9f38148a9fd6a72f2b32436613f9a Mon Sep 17 00:00:00 2001 From: izzy Date: Wed, 3 Jul 2024 16:55:47 -0600 Subject: [PATCH 04/15] fixed repository_path param --- hooli-demo-assets/hooli_demo_assets/definitions.py | 2 +- hooli_basics/definitions.py | 2 +- hooli_batch_enrichment/dagster_batch_enrichment/definitions.py | 2 +- hooli_data_eng/definitions.py | 2 +- hooli_snowflake_insights/definitions.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hooli-demo-assets/hooli_demo_assets/definitions.py b/hooli-demo-assets/hooli_demo_assets/definitions.py index 4089ac18..3e3be05c 100644 --- a/hooli-demo-assets/hooli_demo_assets/definitions.py +++ b/hooli-demo-assets/hooli_demo_assets/definitions.py @@ -15,7 +15,7 @@ defs = Definitions( assets=link_to_git_if_cloud( with_source_code_references([my_sling_assets]), - repository_root_absolute_path=Path(__file__).parent.parent.parent, + repository_root_absolute_path=Path(__file__).parent.parent, ), schedules=[daily_sling_assets], jobs=[daily_sling_job], diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index 750206ed..1f396ec7 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -34,7 +34,7 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF defs = Definitions( assets=link_to_git_if_cloud( with_source_code_references([country_stats, continent_stats, change_model]), - repository_root_absolute_path=Path(__file__).parent.parent, + repository_root_absolute_path=Path(__file__).parent, ), asset_checks=[check_country_stats] ) diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py index 91d11ed4..9f61fbaa 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py @@ -25,7 +25,7 @@ defs = Definitions( assets=link_to_git_if_cloud( with_source_code_references([raw_data, enriched_data]), - repository_root_absolute_path=Path(__file__).parent.parent.parent, + repository_root_absolute_path=Path(__file__).parent.parent, ), schedules=[run_assets_30min], jobs=[run_assets_job], diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 07aadd10..19e652cb 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -70,7 +70,7 @@ ), assets=link_to_git_if_cloud( with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets]), - repository_root_absolute_path=Path(__file__).parent.parent, + repository_root_absolute_path=Path(__file__).parent, ), asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check, *weekly_freshness_check], resources=resource_def[get_env()], diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index 9f49dba9..c49027c2 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -48,7 +48,7 @@ def get_env(): defs = Definitions( assets=link_to_git_if_cloud( with_source_code_references([*snowflake_insights_definitions.assets,]), - repository_root_absolute_path=Path(__file__).parent.parent, + repository_root_absolute_path=Path(__file__).parent, ), schedules=[snowflake_insights_definitions.schedule,], resources=resource_def[get_env()], From 1a306bbd0dddbf4704658a22081faadb4f69adf8 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Fri, 5 Jul 2024 21:07:09 -0400 Subject: [PATCH 05/15] add sensor --- hooli_data_eng/sensors/__init__.py | 56 ++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/hooli_data_eng/sensors/__init__.py b/hooli_data_eng/sensors/__init__.py index fcd19e3d..d106ee85 100644 --- a/hooli_data_eng/sensors/__init__.py +++ b/hooli_data_eng/sensors/__init__.py @@ -1,17 +1,69 @@ +import hashlib +import json from dagster import ( asset_sensor, + sensor, AssetKey, EventLogEntry, RunRequest, SensorEvaluationContext, + AssetSelection, + SensorDefinition, + DagsterInstance, ) - +from datetime import datetime from hooli_data_eng.jobs import predict_job + +from hooli_data_eng.assets.dbt_assets import views_dbt_assets +from hooli_data_eng.project import dbt_project + # This sensor listens for changes to the orders_augmented asset which # represents a dbt model. When the table managed by dbt is updated, # this sensor will trigger the predict_job above, ensuring that anytime # new order data is produced the forecast is updated @asset_sensor(asset_key=AssetKey(["ANALYTICS", "orders_augmented"]), job=predict_job) def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry): - yield RunRequest(run_key=context.cursor) \ No newline at end of file + yield RunRequest(run_key=context.cursor) + + +def get_current_dbt_code_version(asset_key: AssetKey) -> str: + with open(dbt_project.manifest_path) as f: + manifest = json.load(f) + + model_name = asset_key.path[-1] + model_sql = manifest["nodes"][f"model.dbt_project.{model_name}"]["raw_code"] + + return hashlib.sha1(model_sql.encode("utf-8")).hexdigest() + + +@sensor(asset_selection=AssetSelection.assets(views_dbt_assets)) +def my_dbt_code_version_sensor(context: SensorEvaluationContext): + #asset_keys = [AssetKey("my_dbt_model_1"), AssetKey("my_dbt_model_2")] # List your dbt asset keys here + + context.log.info(f"Checking code versions for assets: {views_dbt_assets.keys}") + print(f"Checking code versions for assets: {views_dbt_assets.keys}") + assets_to_materialize = [] + for asset_key in views_dbt_assets.keys: + # instance = DagsterInstance.get() + latest_materialization = context.instance.get_latest_materialization_event(asset_key) + if latest_materialization: + latest_code_version = latest_materialization.asset_materialization.tags.get("dagster/code_version") + context.log.info(f"Latest code version for {asset_key}: {latest_code_version}") + current_code_version = get_current_dbt_code_version(asset_key) # Implement this function to get the current code version + context.log.info(f"Current code version for {asset_key}: {current_code_version}") + if latest_code_version != current_code_version: + assets_to_materialize.append(asset_key) + context.log.info(f"Assets to materialize: {assets_to_materialize}") + if assets_to_materialize: + yield RunRequest( + run_key=f"code_version_update_{datetime.now()}", + asset_selection=list(assets_to_materialize) + ) + # if never materialized before, materialize all + # currently doesn't work + # else: + # yield RunRequest( + # run_key=f"code_version_update_{datetime.now()}", + # asset_selection=list(views_dbt_assets.keys) + # ) \ No newline at end of file From 7c45afb6cf302966e24bab8145fb4edf205e9cd4 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Fri, 5 Jul 2024 21:08:13 -0400 Subject: [PATCH 06/15] add to definitions --- hooli_data_eng/definitions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 19e652cb..a281056f 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -17,7 +17,7 @@ from hooli_data_eng.jobs import analytics_job, predict_job from hooli_data_eng.resources import get_env, resource_def from hooli_data_eng.schedules import analytics_schedule -from hooli_data_eng.sensors import orders_sensor +from hooli_data_eng.sensors import orders_sensor, my_dbt_code_version_sensor from hooli_data_eng.sensors.watch_s3 import watch_s3_sensor from hooli_data_eng.assets.marketing import avg_orders_freshness_check, min_order_freshness_check, min_order_freshness_check_sensor, check_avg_orders, avg_orders_freshness_check_schedule from hooli_data_eng.assets.dbt_assets import weekly_freshness_check, weekly_freshness_check_sensor @@ -80,6 +80,7 @@ watch_s3_sensor, # asset_delay_alert_sensor, min_order_freshness_check_sensor, + my_dbt_code_version_sensor, weekly_freshness_check_sensor ], jobs=[analytics_job, predict_job, dbt_slim_ci_job], From fcd30bdcdb224d0079499efd16b637dd4102406d Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Mon, 8 Jul 2024 10:29:33 -0400 Subject: [PATCH 07/15] reorganize code --- hooli_data_eng/definitions.py | 4 ++-- hooli_data_eng/sensors/__init__.py | 29 +++--------------------- hooli_data_eng/utils/dbt_code_version.py | 13 +++++++++++ 3 files changed, 18 insertions(+), 28 deletions(-) create mode 100644 hooli_data_eng/utils/dbt_code_version.py diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index a281056f..43f8e5dd 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -17,7 +17,7 @@ from hooli_data_eng.jobs import analytics_job, predict_job from hooli_data_eng.resources import get_env, resource_def from hooli_data_eng.schedules import analytics_schedule -from hooli_data_eng.sensors import orders_sensor, my_dbt_code_version_sensor +from hooli_data_eng.sensors import orders_sensor, dbt_code_version_sensor from hooli_data_eng.sensors.watch_s3 import watch_s3_sensor from hooli_data_eng.assets.marketing import avg_orders_freshness_check, min_order_freshness_check, min_order_freshness_check_sensor, check_avg_orders, avg_orders_freshness_check_schedule from hooli_data_eng.assets.dbt_assets import weekly_freshness_check, weekly_freshness_check_sensor @@ -80,7 +80,7 @@ watch_s3_sensor, # asset_delay_alert_sensor, min_order_freshness_check_sensor, - my_dbt_code_version_sensor, + dbt_code_version_sensor, weekly_freshness_check_sensor ], jobs=[analytics_job, predict_job, dbt_slim_ci_job], diff --git a/hooli_data_eng/sensors/__init__.py b/hooli_data_eng/sensors/__init__.py index d106ee85..586d0e86 100644 --- a/hooli_data_eng/sensors/__init__.py +++ b/hooli_data_eng/sensors/__init__.py @@ -1,5 +1,3 @@ -import hashlib -import json from dagster import ( asset_sensor, sensor, @@ -8,15 +6,13 @@ RunRequest, SensorEvaluationContext, AssetSelection, - SensorDefinition, - DagsterInstance, ) from datetime import datetime from hooli_data_eng.jobs import predict_job from hooli_data_eng.assets.dbt_assets import views_dbt_assets -from hooli_data_eng.project import dbt_project +from hooli_data_eng.utils import get_current_dbt_code_version # This sensor listens for changes to the orders_augmented asset which # represents a dbt model. When the table managed by dbt is updated, @@ -27,25 +23,13 @@ def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry): yield RunRequest(run_key=context.cursor) -def get_current_dbt_code_version(asset_key: AssetKey) -> str: - with open(dbt_project.manifest_path) as f: - manifest = json.load(f) - - model_name = asset_key.path[-1] - model_sql = manifest["nodes"][f"model.dbt_project.{model_name}"]["raw_code"] - - return hashlib.sha1(model_sql.encode("utf-8")).hexdigest() - @sensor(asset_selection=AssetSelection.assets(views_dbt_assets)) -def my_dbt_code_version_sensor(context: SensorEvaluationContext): - #asset_keys = [AssetKey("my_dbt_model_1"), AssetKey("my_dbt_model_2")] # List your dbt asset keys here +def dbt_code_version_sensor(context: SensorEvaluationContext): context.log.info(f"Checking code versions for assets: {views_dbt_assets.keys}") - print(f"Checking code versions for assets: {views_dbt_assets.keys}") assets_to_materialize = [] for asset_key in views_dbt_assets.keys: - # instance = DagsterInstance.get() latest_materialization = context.instance.get_latest_materialization_event(asset_key) if latest_materialization: latest_code_version = latest_materialization.asset_materialization.tags.get("dagster/code_version") @@ -59,11 +43,4 @@ def my_dbt_code_version_sensor(context: SensorEvaluationContext): yield RunRequest( run_key=f"code_version_update_{datetime.now()}", asset_selection=list(assets_to_materialize) - ) - # if never materialized before, materialize all - # currently doesn't work - # else: - # yield RunRequest( - # run_key=f"code_version_update_{datetime.now()}", - # asset_selection=list(views_dbt_assets.keys) - # ) \ No newline at end of file + ) \ No newline at end of file diff --git a/hooli_data_eng/utils/dbt_code_version.py b/hooli_data_eng/utils/dbt_code_version.py new file mode 100644 index 00000000..08e93627 --- /dev/null +++ b/hooli_data_eng/utils/dbt_code_version.py @@ -0,0 +1,13 @@ +import hashlib +import json +from dagster import AssetKey +from dagster_dbt import dbt_project + +def get_current_dbt_code_version(asset_key: AssetKey) -> str: + with open(dbt_project.manifest_path) as f: + manifest = json.load(f) + + model_name = asset_key.path[-1] + model_sql = manifest["nodes"][f"model.dbt_project.{model_name}"]["raw_code"] + + return hashlib.sha1(model_sql.encode("utf-8")).hexdigest() \ No newline at end of file From ceda7d6b95c2ad485d745722ae7779f93fe16e83 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Mon, 8 Jul 2024 10:35:22 -0400 Subject: [PATCH 08/15] fix import --- hooli_data_eng/sensors/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hooli_data_eng/sensors/__init__.py b/hooli_data_eng/sensors/__init__.py index 586d0e86..2c45f25a 100644 --- a/hooli_data_eng/sensors/__init__.py +++ b/hooli_data_eng/sensors/__init__.py @@ -12,7 +12,7 @@ from hooli_data_eng.assets.dbt_assets import views_dbt_assets -from hooli_data_eng.utils import get_current_dbt_code_version +from hooli_data_eng.utils.dbt_code_version import get_current_dbt_code_version # This sensor listens for changes to the orders_augmented asset which # represents a dbt model. When the table managed by dbt is updated, From 98bbc24e51492debd2c0160f5ffc69d618d24338 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Mon, 8 Jul 2024 10:41:21 -0400 Subject: [PATCH 09/15] use proper import --- hooli_data_eng/utils/dbt_code_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hooli_data_eng/utils/dbt_code_version.py b/hooli_data_eng/utils/dbt_code_version.py index 08e93627..dcf139dd 100644 --- a/hooli_data_eng/utils/dbt_code_version.py +++ b/hooli_data_eng/utils/dbt_code_version.py @@ -1,7 +1,7 @@ import hashlib import json from dagster import AssetKey -from dagster_dbt import dbt_project +from hooli_data_eng.project import dbt_project def get_current_dbt_code_version(asset_key: AssetKey) -> str: with open(dbt_project.manifest_path) as f: From d1a26c6a765498af7d5bc112ddac0c2736f26673 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Mon, 8 Jul 2024 10:50:01 -0400 Subject: [PATCH 10/15] lowercase schema for duckdb --- hooli-demo-assets/hooli_demo_assets/resources/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hooli-demo-assets/hooli_demo_assets/resources/__init__.py b/hooli-demo-assets/hooli_demo_assets/resources/__init__.py index bd15a2e5..338d36f1 100644 --- a/hooli-demo-assets/hooli_demo_assets/resources/__init__.py +++ b/hooli-demo-assets/hooli_demo_assets/resources/__init__.py @@ -68,7 +68,7 @@ def create_sling_resource(env: str): type="duckdb", instance=f"{DUCKDB_PATH}", database="example", - schema="RAW_DATA", + schema="raw_data", )) elif env == 'BRANCH': connections.append(SlingConnectionResource( From a6ba5cc833d775d5151499e7a01b036ddb2cc63b Mon Sep 17 00:00:00 2001 From: izzy Date: Fri, 12 Jul 2024 10:11:40 -0600 Subject: [PATCH 11/15] added additional code location examples --- workspace.yaml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/workspace.yaml b/workspace.yaml index 07ecd2b8..06d0cdf4 100644 --- a/workspace.yaml +++ b/workspace.yaml @@ -4,4 +4,8 @@ load_from: # Example of deploying multiple code locations locally # - python_module: # module_name: hooli_demo_assets -# working_directory: hooli-demo-assets/ \ No newline at end of file +# working_directory: hooli-demo-assets/ +# - python_file: hooli_basics/definitions.py +# - python_module: +# module_name: dagster_batch_enrichment +# working_directory: hooli_batch_enrichment/ \ No newline at end of file From daca98fee6ecd0b2d3c596b980f9e8d5ded770df Mon Sep 17 00:00:00 2001 From: izzy Date: Fri, 12 Jul 2024 10:12:40 -0600 Subject: [PATCH 12/15] updated code references to use AnchorBasedFilePathMapping --- .../hooli_demo_assets/definitions.py | 11 ++++++++--- hooli_basics/definitions.py | 19 ++++++++++++++----- .../dagster_batch_enrichment/definitions.py | 19 ++++++++++++++----- hooli_data_eng/definitions.py | 7 +++---- 4 files changed, 39 insertions(+), 17 deletions(-) diff --git a/hooli-demo-assets/hooli_demo_assets/definitions.py b/hooli-demo-assets/hooli_demo_assets/definitions.py index 3e3be05c..2e923fcf 100644 --- a/hooli-demo-assets/hooli_demo_assets/definitions.py +++ b/hooli-demo-assets/hooli_demo_assets/definitions.py @@ -1,10 +1,12 @@ from pathlib import Path from dagster import ( + AnchorBasedFilePathMapping, Definitions, + with_source_code_references, ) from dagster._core.definitions.metadata import with_source_code_references -from dagster_cloud.metadata.source_code import link_to_git_if_cloud +from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from hooli_demo_assets.assets.sling import my_sling_assets from hooli_demo_assets.jobs import daily_sling_job @@ -13,9 +15,12 @@ defs = Definitions( - assets=link_to_git_if_cloud( + assets=link_code_references_to_git_if_cloud( with_source_code_references([my_sling_assets]), - repository_root_absolute_path=Path(__file__).parent.parent, + file_path_mapping=AnchorBasedFilePathMapping( + local_file_anchor=Path(__file__), + file_anchor_path_in_repository="hooli-data-eng-pipelines/hooli-demo-assets/hooli_demo_assets/definitions.py", + ), ), schedules=[daily_sling_assets], jobs=[daily_sling_job], diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index 1f396ec7..8bc5754e 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -1,8 +1,14 @@ from pathlib import Path -from dagster import asset, asset_check, AssetCheckResult, Definitions -from dagster._core.definitions.metadata import with_source_code_references -from dagster_cloud.metadata.source_code import link_to_git_if_cloud +from dagster import ( + AnchorBasedFilePathMapping, + asset, + asset_check, + AssetCheckResult, + Definitions, + with_source_code_references, +) +from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from pandas import DataFrame, read_html, get_dummies, to_numeric from sklearn.linear_model import LinearRegression as Regression @@ -32,9 +38,12 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF return result defs = Definitions( - assets=link_to_git_if_cloud( + assets=link_code_references_to_git_if_cloud( with_source_code_references([country_stats, continent_stats, change_model]), - repository_root_absolute_path=Path(__file__).parent, + file_path_mapping=AnchorBasedFilePathMapping( + local_file_anchor=Path(__file__), + file_anchor_path_in_repository="hooli-data-eng-pipelines/hooli_basics/definitions.py", + ), ), asset_checks=[check_country_stats] ) diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py index 9f61fbaa..5519cb65 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py @@ -1,11 +1,17 @@ from pathlib import Path -from dagster import Definitions, define_asset_job, ScheduleDefinition, AssetSelection -from dagster._core.definitions.metadata import with_source_code_references +from dagster import ( + AnchorBasedFilePathMapping, + AssetSelection, + define_asset_job, + Definitions, + ScheduleDefinition, + with_source_code_references, +) from dagster_batch_enrichment.api import EnrichmentAPI from dagster_batch_enrichment.warehouse import MyWarehouse from dagster_batch_enrichment.assets import raw_data, enriched_data -from dagster_cloud.metadata.source_code import link_to_git_if_cloud +from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud # define a job and schedule to run the pipeline @@ -23,9 +29,12 @@ ) defs = Definitions( - assets=link_to_git_if_cloud( + assets=link_code_references_to_git_if_cloud( with_source_code_references([raw_data, enriched_data]), - repository_root_absolute_path=Path(__file__).parent.parent, + file_path_mapping=AnchorBasedFilePathMapping( + local_file_anchor=Path(__file__), + file_anchor_path_in_repository="hooli-data-eng-pipelines/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py", + ), ), schedules=[run_assets_30min], jobs=[run_assets_job], diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 43f8e5dd..8bbdaed1 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -6,9 +6,9 @@ load_assets_from_package_module, build_column_schema_change_checks, multiprocess_executor, + with_source_code_references, ) -from dagster._core.definitions.metadata import with_source_code_references -from dagster_cloud.metadata.source_code import link_to_git_if_cloud +from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets from hooli_data_eng.assets.dbt_assets import dbt_slim_ci_job @@ -68,9 +68,8 @@ executor=multiprocess_executor.configured( {"max_concurrent": 3} ), - assets=link_to_git_if_cloud( + assets=link_code_references_to_git_if_cloud( with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets]), - repository_root_absolute_path=Path(__file__).parent, ), asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check, *weekly_freshness_check], resources=resource_def[get_env()], From d55503fb919c6875eb969d0d3010b4c96b585e00 Mon Sep 17 00:00:00 2001 From: izzy Date: Fri, 12 Jul 2024 10:20:38 -0600 Subject: [PATCH 13/15] udated AnchorBasedFilePathMapping --- hooli_snowflake_insights/definitions.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index c49027c2..af8f7a68 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -1,12 +1,17 @@ import os from pathlib import Path -from dagster import Definitions, EnvVar, ResourceDefinition -from dagster._core.definitions.metadata import with_source_code_references +from dagster import ( + AnchorBasedFilePathMapping, + Definitions, + EnvVar, + ResourceDefinition, + with_source_code_references, +) from dagster_cloud.dagster_insights import ( create_snowflake_insights_asset_and_schedule, ) -from dagster_cloud.metadata.source_code import link_to_git_if_cloud +from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from dagster_snowflake import SnowflakeResource # Used to derive environment (LOCAL, BRANCH, PROD) @@ -46,9 +51,12 @@ def get_env(): ) defs = Definitions( - assets=link_to_git_if_cloud( + assets=link_code_references_to_git_if_cloud( with_source_code_references([*snowflake_insights_definitions.assets,]), - repository_root_absolute_path=Path(__file__).parent, + file_path_mapping=AnchorBasedFilePathMapping( + local_file_anchor=Path(__file__), + file_anchor_path_in_repository="hooli-data-eng-pipelines/hooli_snowflake_insights/definitions.py", + ), ), schedules=[snowflake_insights_definitions.schedule,], resources=resource_def[get_env()], From da12f14e8dd18e1b7b2cfce23b1b5ef5ca290760 Mon Sep 17 00:00:00 2001 From: izzy Date: Fri, 12 Jul 2024 10:30:30 -0600 Subject: [PATCH 14/15] fixed CLI command that was renamed --- .github/workflows/deploy-dagster-cloud.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy-dagster-cloud.yml b/.github/workflows/deploy-dagster-cloud.yml index cf0d3989..9514e898 100644 --- a/.github/workflows/deploy-dagster-cloud.yml +++ b/.github/workflows/deploy-dagster-cloud.yml @@ -85,7 +85,7 @@ jobs: pip install pip --upgrade; pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb dbt-snowflake --upgrade --upgrade-strategy eager; make deps - dagster-dbt project prepare-for-deployment --file hooli_data_eng/project.py + dagster-dbt project prepare-and-package --file hooli_data_eng/project.py dagster-cloud ci dagster-dbt project manage-state --file hooli_data_eng/project.py --source-deployment data-eng-prod - name: Build and upload Docker image for data-eng-pipeline From 9e17213d30d20acf90c64acb91268c03b6f938c1 Mon Sep 17 00:00:00 2001 From: izzy Date: Fri, 12 Jul 2024 10:48:47 -0600 Subject: [PATCH 15/15] removed root folder from AnchorBasedFilePathMapping path --- hooli-demo-assets/hooli_demo_assets/definitions.py | 2 +- hooli_basics/definitions.py | 2 +- hooli_batch_enrichment/dagster_batch_enrichment/definitions.py | 2 +- hooli_snowflake_insights/definitions.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hooli-demo-assets/hooli_demo_assets/definitions.py b/hooli-demo-assets/hooli_demo_assets/definitions.py index 2e923fcf..32c2360d 100644 --- a/hooli-demo-assets/hooli_demo_assets/definitions.py +++ b/hooli-demo-assets/hooli_demo_assets/definitions.py @@ -19,7 +19,7 @@ with_source_code_references([my_sling_assets]), file_path_mapping=AnchorBasedFilePathMapping( local_file_anchor=Path(__file__), - file_anchor_path_in_repository="hooli-data-eng-pipelines/hooli-demo-assets/hooli_demo_assets/definitions.py", + file_anchor_path_in_repository="hooli-demo-assets/hooli_demo_assets/definitions.py", ), ), schedules=[daily_sling_assets], diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index 8bc5754e..d124c684 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -42,7 +42,7 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF with_source_code_references([country_stats, continent_stats, change_model]), file_path_mapping=AnchorBasedFilePathMapping( local_file_anchor=Path(__file__), - file_anchor_path_in_repository="hooli-data-eng-pipelines/hooli_basics/definitions.py", + file_anchor_path_in_repository="hooli_basics/definitions.py", ), ), asset_checks=[check_country_stats] diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py index 5519cb65..25a51401 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py @@ -33,7 +33,7 @@ with_source_code_references([raw_data, enriched_data]), file_path_mapping=AnchorBasedFilePathMapping( local_file_anchor=Path(__file__), - file_anchor_path_in_repository="hooli-data-eng-pipelines/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py", + file_anchor_path_in_repository="hooli_batch_enrichment/dagster_batch_enrichment/definitions.py", ), ), schedules=[run_assets_30min], diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index af8f7a68..e19f2c46 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -55,7 +55,7 @@ def get_env(): with_source_code_references([*snowflake_insights_definitions.assets,]), file_path_mapping=AnchorBasedFilePathMapping( local_file_anchor=Path(__file__), - file_anchor_path_in_repository="hooli-data-eng-pipelines/hooli_snowflake_insights/definitions.py", + file_anchor_path_in_repository="hooli_snowflake_insights/definitions.py", ), ), schedules=[snowflake_insights_definitions.schedule,],