diff --git a/.github/workflows/deploy-dagster-cloud.yml b/.github/workflows/deploy-dagster-cloud.yml index cf0d398..9514e89 100644 --- a/.github/workflows/deploy-dagster-cloud.yml +++ b/.github/workflows/deploy-dagster-cloud.yml @@ -85,7 +85,7 @@ jobs: pip install pip --upgrade; pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb dbt-snowflake --upgrade --upgrade-strategy eager; make deps - dagster-dbt project prepare-for-deployment --file hooli_data_eng/project.py + dagster-dbt project prepare-and-package --file hooli_data_eng/project.py dagster-cloud ci dagster-dbt project manage-state --file hooli_data_eng/project.py --source-deployment data-eng-prod - name: Build and upload Docker image for data-eng-pipeline diff --git a/hooli-demo-assets/hooli_demo_assets/definitions.py b/hooli-demo-assets/hooli_demo_assets/definitions.py index 3c6aa84..32c2360 100644 --- a/hooli-demo-assets/hooli_demo_assets/definitions.py +++ b/hooli-demo-assets/hooli_demo_assets/definitions.py @@ -1,6 +1,12 @@ +from pathlib import Path + from dagster import ( + AnchorBasedFilePathMapping, Definitions, + with_source_code_references, ) +from dagster._core.definitions.metadata import with_source_code_references +from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from hooli_demo_assets.assets.sling import my_sling_assets from hooli_demo_assets.jobs import daily_sling_job @@ -9,7 +15,13 @@ defs = Definitions( - assets=[my_sling_assets], + assets=link_code_references_to_git_if_cloud( + with_source_code_references([my_sling_assets]), + file_path_mapping=AnchorBasedFilePathMapping( + local_file_anchor=Path(__file__), + file_anchor_path_in_repository="hooli-demo-assets/hooli_demo_assets/definitions.py", + ), + ), schedules=[daily_sling_assets], jobs=[daily_sling_job], resources={ diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index d5699e2..d124c68 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -1,4 +1,14 @@ -from dagster import asset, asset_check, AssetCheckResult, Definitions +from pathlib import Path + +from dagster import ( + AnchorBasedFilePathMapping, + asset, + asset_check, + AssetCheckResult, + Definitions, + with_source_code_references, +) +from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from pandas import DataFrame, read_html, get_dummies, to_numeric from sklearn.linear_model import LinearRegression as Regression @@ -28,7 +38,13 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF return result defs = Definitions( - assets=[country_stats, continent_stats, change_model], + assets=link_code_references_to_git_if_cloud( + with_source_code_references([country_stats, continent_stats, change_model]), + file_path_mapping=AnchorBasedFilePathMapping( + local_file_anchor=Path(__file__), + file_anchor_path_in_repository="hooli_basics/definitions.py", + ), + ), asset_checks=[check_country_stats] ) diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py index b2a57d2..25a5140 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/definitions.py @@ -1,7 +1,17 @@ -from dagster import Definitions, define_asset_job, ScheduleDefinition, AssetSelection +from pathlib import Path + +from dagster import ( + AnchorBasedFilePathMapping, + AssetSelection, + define_asset_job, + Definitions, + ScheduleDefinition, + with_source_code_references, +) from dagster_batch_enrichment.api import EnrichmentAPI from dagster_batch_enrichment.warehouse import MyWarehouse from dagster_batch_enrichment.assets import raw_data, enriched_data +from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud # define a job and schedule to run the pipeline @@ -19,7 +29,13 @@ ) defs = Definitions( - assets=[raw_data, enriched_data], + assets=link_code_references_to_git_if_cloud( + with_source_code_references([raw_data, enriched_data]), + file_path_mapping=AnchorBasedFilePathMapping( + local_file_anchor=Path(__file__), + file_anchor_path_in_repository="hooli_batch_enrichment/dagster_batch_enrichment/definitions.py", + ), + ), schedules=[run_assets_30min], jobs=[run_assets_job], resources={ diff --git a/hooli_data_eng/assets/dbt_assets.py b/hooli_data_eng/assets/dbt_assets.py index d993d4b..4674f5f 100644 --- a/hooli_data_eng/assets/dbt_assets.py +++ b/hooli_data_eng/assets/dbt_assets.py @@ -133,10 +133,12 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliReso @dbt_assets( manifest=DBT_MANIFEST, + project=dbt_project, select="orders_cleaned users_cleaned orders_augmented", partitions_def=daily_partitions, dagster_dbt_translator=CustomDagsterDbtTranslator( - settings=DagsterDbtTranslatorSettings(enable_asset_checks=True) + settings=DagsterDbtTranslatorSettings(enable_asset_checks=True, + enable_code_references=True,) ), backfill_policy=BackfillPolicy.single_run(), ) @@ -146,10 +148,12 @@ def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): @dbt_assets( manifest=DBT_MANIFEST, + project=dbt_project, select="weekly_order_summary order_stats", partitions_def=weekly_partitions, dagster_dbt_translator=CustomDagsterDbtTranslator( - DagsterDbtTranslatorSettings(enable_asset_checks=True) + DagsterDbtTranslatorSettings(enable_asset_checks=True, + enable_code_references=True,) ), backfill_policy=BackfillPolicy.single_run(), ) @@ -164,9 +168,11 @@ def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): @dbt_assets( manifest=DBT_MANIFEST, + project=dbt_project, select="company_perf sku_stats company_stats locations_cleaned", dagster_dbt_translator=CustomDagsterDbtTranslatorForViews( - DagsterDbtTranslatorSettings(enable_asset_checks=True) + DagsterDbtTranslatorSettings(enable_asset_checks=True, + enable_code_references=True,) ), ) def views_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): @@ -203,8 +209,10 @@ def dbt_slim_ci(dbt2: DbtCliResource): yield from dbt2.cli( args=dbt_command, manifest=DBT_MANIFEST, + project=dbt_project, dagster_dbt_translator=CustomDagsterDbtTranslator( - DagsterDbtTranslatorSettings(enable_asset_checks=True) + DagsterDbtTranslatorSettings(enable_asset_checks=True, + enable_code_references=True,) ), ).stream().fetch_row_counts().fetch_column_metadata() diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 854d73c..8bbdaed 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -1,3 +1,4 @@ +from pathlib import Path from dagster import ( Definitions, @@ -5,7 +6,9 @@ load_assets_from_package_module, build_column_schema_change_checks, multiprocess_executor, + with_source_code_references, ) +from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets from hooli_data_eng.assets.dbt_assets import dbt_slim_ci_job @@ -65,7 +68,9 @@ executor=multiprocess_executor.configured( {"max_concurrent": 3} ), - assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets], # + assets=link_code_references_to_git_if_cloud( + with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets]), + ), asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check, *weekly_freshness_check], resources=resource_def[get_env()], schedules=[analytics_schedule, avg_orders_freshness_check_schedule], diff --git a/hooli_snowflake_insights/definitions.py b/hooli_snowflake_insights/definitions.py index c5412ec..e19f2c4 100644 --- a/hooli_snowflake_insights/definitions.py +++ b/hooli_snowflake_insights/definitions.py @@ -1,9 +1,17 @@ import os +from pathlib import Path -from dagster import Definitions, EnvVar, ResourceDefinition +from dagster import ( + AnchorBasedFilePathMapping, + Definitions, + EnvVar, + ResourceDefinition, + with_source_code_references, +) from dagster_cloud.dagster_insights import ( create_snowflake_insights_asset_and_schedule, ) +from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from dagster_snowflake import SnowflakeResource # Used to derive environment (LOCAL, BRANCH, PROD) @@ -43,7 +51,13 @@ def get_env(): ) defs = Definitions( - assets=[*snowflake_insights_definitions.assets,], + assets=link_code_references_to_git_if_cloud( + with_source_code_references([*snowflake_insights_definitions.assets,]), + file_path_mapping=AnchorBasedFilePathMapping( + local_file_anchor=Path(__file__), + file_anchor_path_in_repository="hooli_snowflake_insights/definitions.py", + ), + ), schedules=[snowflake_insights_definitions.schedule,], resources=resource_def[get_env()], ) diff --git a/workspace.yaml b/workspace.yaml index 07ecd2b..06d0cdf 100644 --- a/workspace.yaml +++ b/workspace.yaml @@ -4,4 +4,8 @@ load_from: # Example of deploying multiple code locations locally # - python_module: # module_name: hooli_demo_assets -# working_directory: hooli-demo-assets/ \ No newline at end of file +# working_directory: hooli-demo-assets/ +# - python_file: hooli_basics/definitions.py +# - python_module: +# module_name: dagster_batch_enrichment +# working_directory: hooli_batch_enrichment/ \ No newline at end of file