From 9b997a47020114b4c5a24fc5fd4beeb5b099c057 Mon Sep 17 00:00:00 2001 From: izzy Date: Fri, 12 Jul 2024 15:49:55 -0600 Subject: [PATCH 1/4] updated name to be more descriptive --- hooli_data_eng/utils/storage_kind_helpers.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 hooli_data_eng/utils/storage_kind_helpers.py diff --git a/hooli_data_eng/utils/storage_kind_helpers.py b/hooli_data_eng/utils/storage_kind_helpers.py new file mode 100644 index 00000000..15270d70 --- /dev/null +++ b/hooli_data_eng/utils/storage_kind_helpers.py @@ -0,0 +1,15 @@ +import os + +def get_storage_kind() -> str: + """ + Determine the storage kind based on the environment. + + Returns: + str: The storage kind ('snowflake' or 'duckdb'). + """ + if ( + os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod" or + os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1" + ): + return "snowflake" + return "duckdb" \ No newline at end of file From 6f1ee2e2eae91e89d938849ec3e0cf7dd488bcf9 Mon Sep 17 00:00:00 2001 From: izzy Date: Fri, 12 Jul 2024 15:50:35 -0600 Subject: [PATCH 2/4] added missing compute and storage kinds --- .../hooli_demo_assets/assets/sling.py | 4 +++ hooli_basics/definitions.py | 16 +++++++++--- .../dagster_batch_enrichment/assets.py | 25 +++++++++++++------ hooli_data_eng/assets/forecasting/__init__.py | 4 ++- hooli_data_eng/assets/marketing/__init__.py | 2 +- hooli_data_eng/assets/raw_data/__init__.py | 2 +- hooli_data_eng/utils/config_utils.py | 15 ----------- 7 files changed, 40 insertions(+), 28 deletions(-) delete mode 100644 hooli_data_eng/utils/config_utils.py diff --git a/hooli-demo-assets/hooli_demo_assets/assets/sling.py b/hooli-demo-assets/hooli_demo_assets/assets/sling.py index faf36abc..fafaaf26 100644 --- a/hooli-demo-assets/hooli_demo_assets/assets/sling.py +++ b/hooli-demo-assets/hooli_demo_assets/assets/sling.py @@ -1,3 +1,4 @@ +from dagster._core.definitions.tags import StorageKindTagSet from dagster_embedded_elt.sling import ( sling_assets, SlingResource, @@ -13,6 +14,9 @@ def __init__(self, target_prefix="RAW_DATA"): def get_group_name(self, stream_definition): return "RAW_DATA" + + def get_tags(self, stream_definition): + return {**StorageKindTagSet(storage_kind="S3")} @sling_assets( diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index d124c684..b7d54b81 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -8,11 +8,15 @@ Definitions, with_source_code_references, ) +from dagster._core.definitions.tags import StorageKindTagSet from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from pandas import DataFrame, read_html, get_dummies, to_numeric from sklearn.linear_model import LinearRegression as Regression -@asset +@asset( + compute_kind="Kubernetes", + tags={**StorageKindTagSet(storage_kind="S3")}, +) def country_stats() -> DataFrame: df = read_html("https://tinyurl.com/mry64ebh", flavor='html5lib')[0] df.columns = ["country", "continent", "region", "pop_2022", "pop_2023", "pop_change"] @@ -25,13 +29,19 @@ def country_stats() -> DataFrame: def check_country_stats(country_stats): return AssetCheckResult(success=True) -@asset +@asset( + compute_kind="Kubernetes", + tags={**StorageKindTagSet(storage_kind="S3")}, +) def change_model(country_stats: DataFrame) -> Regression: data = country_stats.dropna(subset=["pop_change"]) dummies = get_dummies(data[["continent"]]) return Regression().fit(dummies, data["pop_change"]) -@asset +@asset( + compute_kind="Kubernetes", + tags={**StorageKindTagSet(storage_kind="S3")}, +) def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame: result = country_stats.groupby("continent").sum() result["pop_change_factor"] = change_model.coef_ diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/assets.py b/hooli_batch_enrichment/dagster_batch_enrichment/assets.py index 3a0db9ad..c4d9da34 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/assets.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/assets.py @@ -1,19 +1,26 @@ +import json + from dagster import asset, OpExecutionContext ,MetadataValue, DynamicOut, Config, op, DynamicOutput, Out, graph_asset, RetryPolicy, Config +from dagster._core.definitions.tags import StorageKindTagSet +import pandas as pd +from pydantic import Field +import numpy as np + from dagster_batch_enrichment.warehouse import MyWarehouse from dagster_batch_enrichment.api import EnrichmentAPI -import numpy as np -from pydantic import Field -import pandas as pd -import json + class experimentConfig(Config): experiment_name: str -@asset +@asset( + compute_kind="Kubernetes", + tags={**StorageKindTagSet(storage_kind="S3")}, +) def raw_data( context: OpExecutionContext, warehouse: MyWarehouse, - config: experimentConfig + config: experimentConfig, ): """ Placeholder for querying a real data source""" orders_to_process = warehouse.get_raw_data() @@ -80,7 +87,11 @@ def concat_chunk_list(chunks) -> pd.DataFrame: return pd.concat(chunks) -@graph_asset +@graph_asset( + tags={ + **StorageKindTagSet(storage_kind="S3"), + }, +) def enriched_data(raw_data) -> pd.DataFrame: """Full enrichment process""" chunks = split_rows(raw_data) diff --git a/hooli_data_eng/assets/forecasting/__init__.py b/hooli_data_eng/assets/forecasting/__init__.py index a8b3840b..5004ce30 100644 --- a/hooli_data_eng/assets/forecasting/__init__.py +++ b/hooli_data_eng/assets/forecasting/__init__.py @@ -22,7 +22,7 @@ from databricks.sdk.service import jobs from pydantic import Field -from hooli_data_eng.utils.config_utils import get_storage_kind +from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind # dynamically determine storage_kind based on environment @@ -187,6 +187,7 @@ def big_orders(context, predicted_orders: pd.DataFrame): "order_forecast_model": AssetIn(), }, required_resource_keys={"io_manager"}, + asset_tags={**StorageKindTagSet(storage_kind="S3")}, ) @@ -245,6 +246,7 @@ def databricks_asset( @asset( deps=[predicted_orders], compute_kind="kubernetes", + tags={**StorageKindTagSet(storage_kind="S3")}, ) def k8s_pod_asset( context: AssetExecutionContext, diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index dc201c3f..31568449 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -20,7 +20,7 @@ import pandas as pd from hooli_data_eng.assets.dbt_assets import allow_outdated_parents_policy -from hooli_data_eng.utils.config_utils import get_storage_kind +from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind # dynamically determine storage_kind based on environment diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index 11116ffb..9c4262a2 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -19,7 +19,7 @@ import pandas as pd from hooli_data_eng.resources.api import RawDataAPI -from hooli_data_eng.utils.config_utils import get_storage_kind +from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind # dynamically determine storage_kind based on environment diff --git a/hooli_data_eng/utils/config_utils.py b/hooli_data_eng/utils/config_utils.py deleted file mode 100644 index 15270d70..00000000 --- a/hooli_data_eng/utils/config_utils.py +++ /dev/null @@ -1,15 +0,0 @@ -import os - -def get_storage_kind() -> str: - """ - Determine the storage kind based on the environment. - - Returns: - str: The storage kind ('snowflake' or 'duckdb'). - """ - if ( - os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod" or - os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1" - ): - return "snowflake" - return "duckdb" \ No newline at end of file From bc9586bbc4a2fb0aa2f5469831373da89c531dfe Mon Sep 17 00:00:00 2001 From: Sean Lopp Date: Tue, 16 Jul 2024 12:16:49 -0600 Subject: [PATCH 3/4] add anchor path --- hooli_data_eng/definitions.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index 8bbdaed1..1e8ca2ea 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -1,6 +1,7 @@ from pathlib import Path from dagster import ( + AnchorBasedFilePathMapping, Definitions, load_assets_from_modules, load_assets_from_package_module, @@ -70,6 +71,10 @@ ), assets=link_code_references_to_git_if_cloud( with_source_code_references([*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets]), + file_path_mapping=AnchorBasedFilePathMapping( + local_file_anchor=Path(__file__), + file_anchor_path_in_repository="hooli_data_eng/definitions.py" + ) ), asset_checks=[*raw_data_schema_checks, *dbt_asset_checks, check_users, check_avg_orders, *min_order_freshness_check, *avg_orders_freshness_check, *weekly_freshness_check], resources=resource_def[get_env()], From fe791dac9921b799df6d028d45e7ec5baff10ada Mon Sep 17 00:00:00 2001 From: izzy Date: Tue, 16 Jul 2024 16:20:14 -0600 Subject: [PATCH 4/4] derive storage_kind dynamically from the target specified in the replication_config --- hooli-demo-assets/hooli_demo_assets/assets/sling.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/hooli-demo-assets/hooli_demo_assets/assets/sling.py b/hooli-demo-assets/hooli_demo_assets/assets/sling.py index fafaaf26..fce8a609 100644 --- a/hooli-demo-assets/hooli_demo_assets/assets/sling.py +++ b/hooli-demo-assets/hooli_demo_assets/assets/sling.py @@ -11,12 +11,17 @@ class CustomSlingTranslator(DagsterSlingTranslator): def __init__(self, target_prefix="RAW_DATA"): super().__init__(target_prefix=target_prefix) + self.replication_config = replication_config def get_group_name(self, stream_definition): return "RAW_DATA" def get_tags(self, stream_definition): - return {**StorageKindTagSet(storage_kind="S3")} + # derive storage_kind from the target set in the replication_config + storage_kind = self.replication_config.get("target", "DUCKDB") + if storage_kind.startswith("SNOWFLAKE"): + storage_kind = "SNOWFLAKE" + return {**StorageKindTagSet(storage_kind=storage_kind)} @sling_assets(