Skip to content

Commit

Permalink
Merge pull request #100 from dagster-io/izzy/add_code_references
Browse files Browse the repository at this point in the history
Add links to source code
  • Loading branch information
izzye84 committed Jul 12, 2024
2 parents 5e2e985 + 9e17213 commit 4aa28f2
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ jobs:
pip install pip --upgrade;
pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb dbt-snowflake --upgrade --upgrade-strategy eager;
make deps
dagster-dbt project prepare-for-deployment --file hooli_data_eng/project.py
dagster-dbt project prepare-and-package --file hooli_data_eng/project.py
dagster-cloud ci dagster-dbt project manage-state --file hooli_data_eng/project.py --source-deployment data-eng-prod
- name: Build and upload Docker image for data-eng-pipeline
Expand Down
14 changes: 13 additions & 1 deletion hooli-demo-assets/hooli_demo_assets/definitions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from pathlib import Path

from dagster import (
AnchorBasedFilePathMapping,
Definitions,
with_source_code_references,
)
from dagster._core.definitions.metadata import with_source_code_references
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud

from hooli_demo_assets.assets.sling import my_sling_assets
from hooli_demo_assets.jobs import daily_sling_job
Expand All @@ -9,7 +15,13 @@


defs = Definitions(
assets=[my_sling_assets],
assets=link_code_references_to_git_if_cloud(
with_source_code_references([my_sling_assets]),
file_path_mapping=AnchorBasedFilePathMapping(
local_file_anchor=Path(__file__),
file_anchor_path_in_repository="hooli-demo-assets/hooli_demo_assets/definitions.py",
),
),
schedules=[daily_sling_assets],
jobs=[daily_sling_job],
resources={
Expand Down
20 changes: 18 additions & 2 deletions hooli_basics/definitions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
from dagster import asset, asset_check, AssetCheckResult, Definitions
from pathlib import Path

from dagster import (
AnchorBasedFilePathMapping,
asset,
asset_check,
AssetCheckResult,
Definitions,
with_source_code_references,
)
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud
from pandas import DataFrame, read_html, get_dummies, to_numeric
from sklearn.linear_model import LinearRegression as Regression

Expand Down Expand Up @@ -28,7 +38,13 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF
return result

defs = Definitions(
assets=[country_stats, continent_stats, change_model],
assets=link_code_references_to_git_if_cloud(
with_source_code_references([country_stats, continent_stats, change_model]),
file_path_mapping=AnchorBasedFilePathMapping(
local_file_anchor=Path(__file__),
file_anchor_path_in_repository="hooli_basics/definitions.py",
),
),
asset_checks=[check_country_stats]
)

20 changes: 18 additions & 2 deletions hooli_batch_enrichment/dagster_batch_enrichment/definitions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
from dagster import Definitions, define_asset_job, ScheduleDefinition, AssetSelection
from pathlib import Path

from dagster import (
AnchorBasedFilePathMapping,
AssetSelection,
define_asset_job,
Definitions,
ScheduleDefinition,
with_source_code_references,
)
from dagster_batch_enrichment.api import EnrichmentAPI
from dagster_batch_enrichment.warehouse import MyWarehouse
from dagster_batch_enrichment.assets import raw_data, enriched_data
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud


# define a job and schedule to run the pipeline
Expand All @@ -19,7 +29,13 @@
)

defs = Definitions(
assets=[raw_data, enriched_data],
assets=link_code_references_to_git_if_cloud(
with_source_code_references([raw_data, enriched_data]),
file_path_mapping=AnchorBasedFilePathMapping(
local_file_anchor=Path(__file__),
file_anchor_path_in_repository="hooli_batch_enrichment/dagster_batch_enrichment/definitions.py",
),
),
schedules=[run_assets_30min],
jobs=[run_assets_job],
resources={
Expand Down
16 changes: 12 additions & 4 deletions hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,12 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliReso

@dbt_assets(
manifest=DBT_MANIFEST,
project=dbt_project,
select="orders_cleaned users_cleaned orders_augmented",
partitions_def=daily_partitions,
dagster_dbt_translator=CustomDagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)
settings=DagsterDbtTranslatorSettings(enable_asset_checks=True,
enable_code_references=True,)
),
backfill_policy=BackfillPolicy.single_run(),
)
Expand All @@ -146,10 +148,12 @@ def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):

@dbt_assets(
manifest=DBT_MANIFEST,
project=dbt_project,
select="weekly_order_summary order_stats",
partitions_def=weekly_partitions,
dagster_dbt_translator=CustomDagsterDbtTranslator(
DagsterDbtTranslatorSettings(enable_asset_checks=True)
DagsterDbtTranslatorSettings(enable_asset_checks=True,
enable_code_references=True,)
),
backfill_policy=BackfillPolicy.single_run(),
)
Expand All @@ -164,9 +168,11 @@ def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):

@dbt_assets(
manifest=DBT_MANIFEST,
project=dbt_project,
select="company_perf sku_stats company_stats locations_cleaned",
dagster_dbt_translator=CustomDagsterDbtTranslatorForViews(
DagsterDbtTranslatorSettings(enable_asset_checks=True)
DagsterDbtTranslatorSettings(enable_asset_checks=True,
enable_code_references=True,)
),
)
def views_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
Expand Down Expand Up @@ -203,8 +209,10 @@ def dbt_slim_ci(dbt2: DbtCliResource):
yield from dbt2.cli(
args=dbt_command,
manifest=DBT_MANIFEST,
project=dbt_project,
dagster_dbt_translator=CustomDagsterDbtTranslator(
DagsterDbtTranslatorSettings(enable_asset_checks=True)
DagsterDbtTranslatorSettings(enable_asset_checks=True,
enable_code_references=True,)
),
).stream().fetch_row_counts().fetch_column_metadata()

Expand Down
7 changes: 6 additions & 1 deletion hooli_data_eng/definitions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from pathlib import Path

from dagster import (
Definitions,
load_assets_from_modules,
load_assets_from_package_module,
build_column_schema_change_checks,
multiprocess_executor,
with_source_code_references,
)
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud

from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets
from hooli_data_eng.assets.dbt_assets import dbt_slim_ci_job
Expand Down Expand Up @@ -65,7 +68,9 @@
executor=multiprocess_executor.configured(
{"max_concurrent": 3}
),
assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets], #
assets=link_code_references_to_git_if_cloud(
with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets]),
),
asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check, *weekly_freshness_check],
resources=resource_def[get_env()],
schedules=[analytics_schedule, avg_orders_freshness_check_schedule],
Expand Down
18 changes: 16 additions & 2 deletions hooli_snowflake_insights/definitions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import os
from pathlib import Path

from dagster import Definitions, EnvVar, ResourceDefinition
from dagster import (
AnchorBasedFilePathMapping,
Definitions,
EnvVar,
ResourceDefinition,
with_source_code_references,
)
from dagster_cloud.dagster_insights import (
create_snowflake_insights_asset_and_schedule,
)
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud
from dagster_snowflake import SnowflakeResource

# Used to derive environment (LOCAL, BRANCH, PROD)
Expand Down Expand Up @@ -43,7 +51,13 @@ def get_env():
)

defs = Definitions(
assets=[*snowflake_insights_definitions.assets,],
assets=link_code_references_to_git_if_cloud(
with_source_code_references([*snowflake_insights_definitions.assets,]),
file_path_mapping=AnchorBasedFilePathMapping(
local_file_anchor=Path(__file__),
file_anchor_path_in_repository="hooli_snowflake_insights/definitions.py",
),
),
schedules=[snowflake_insights_definitions.schedule,],
resources=resource_def[get_env()],
)
6 changes: 5 additions & 1 deletion workspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ load_from:
# Example of deploying multiple code locations locally
# - python_module:
# module_name: hooli_demo_assets
# working_directory: hooli-demo-assets/
# working_directory: hooli-demo-assets/
# - python_file: hooli_basics/definitions.py
# - python_module:
# module_name: dagster_batch_enrichment
# working_directory: hooli_batch_enrichment/

0 comments on commit 4aa28f2

Please sign in to comment.