diff --git a/hooli_data_eng/assets/forecasting/__init__.py b/hooli_data_eng/assets/forecasting/__init__.py index d9a2beb..a8b3840 100644 --- a/hooli_data_eng/assets/forecasting/__init__.py +++ b/hooli_data_eng/assets/forecasting/__init__.py @@ -1,5 +1,4 @@ from typing import Any, Tuple -from dagster_k8s import PipesK8sClient import numpy as np import pandas as pd @@ -15,12 +14,20 @@ AssetExecutionContext, MaterializeResult, ) +from dagster_k8s import PipesK8sClient from dagstermill import define_dagstermill_asset +from dagster._core.definitions.tags import StorageKindTagSet from dagster._utils import file_relative_path from dagster_databricks import PipesDatabricksClient from databricks.sdk.service import jobs from pydantic import Field +from hooli_data_eng.utils.config_utils import get_storage_kind + + +# dynamically determine storage_kind based on environment +storage_kind = get_storage_kind() + def model_func(x, a, b): return a * np.exp(b * (x / 10**18 - 1.6095)) @@ -57,6 +64,7 @@ class modelHyperParams(Config): ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])}, compute_kind="scikitlearn", io_manager_key="model_io_manager", + tags={**StorageKindTagSet(storage_kind="s3")}, ) def order_forecast_model( context, weekly_order_summary: pd.DataFrame, config: modelHyperParams @@ -91,8 +99,9 @@ def order_forecast_model( key_prefix=["forecasting"], io_manager_key="model_io_manager", partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"), - tags={"core_kpis":""} - ) + tags={"core_kpis":"", + **StorageKindTagSet(storage_kind=storage_kind)}, +) def model_stats_by_month( context, weekly_order_summary: pd.DataFrame, @@ -130,6 +139,7 @@ def model_stats_by_month( }, compute_kind="pandas", key_prefix=["FORECASTING"], + tags={**StorageKindTagSet(storage_kind=storage_kind)}, ) def predicted_orders( weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float] @@ -157,6 +167,7 @@ def predicted_orders( key_prefix=["FORECASTING"], required_resource_keys={"step_launcher", "pyspark"}, metadata={"resource_constrained_at": 50}, + tags={**StorageKindTagSet(storage_kind="databricks")}, ) def big_orders(context, predicted_orders: pd.DataFrame): """Days where predicted orders surpass our current carrying capacity""" @@ -185,6 +196,7 @@ def big_orders(context, predicted_orders: pd.DataFrame): @asset( deps=[predicted_orders], compute_kind="databricks", + tags={**StorageKindTagSet(storage_kind="databricks")}, ) def databricks_asset( context: AssetExecutionContext, diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index 9b6017e..dc201c3 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -1,3 +1,5 @@ +import datetime + from dagster import ( asset, build_last_update_freshness_checks, @@ -9,16 +11,20 @@ AssetCheckResult, asset_check, AssetKey, - FreshnessPolicy, define_asset_job, ScheduleDefinition, AssetSelection ) -import pandas as pd -import datetime +from dagster._core.definitions.tags import StorageKindTagSet from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks +import pandas as pd from hooli_data_eng.assets.dbt_assets import allow_outdated_parents_policy +from hooli_data_eng.utils.config_utils import get_storage_kind + + +# dynamically determine storage_kind based on environment +storage_kind = get_storage_kind() # These assets take data from a SQL table managed by @@ -29,6 +35,7 @@ compute_kind="pandas", owners=["team:programmers", "lopp@dagsterlabs.com"], ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}, + tags={**StorageKindTagSet(storage_kind=storage_kind)}, ) def avg_orders( context: AssetExecutionContext, company_perf: pd.DataFrame @@ -51,9 +58,10 @@ def check_avg_orders(context, avg_orders: pd.DataFrame): @asset( key_prefix="MARKETING", - compute_kind="snowflake", + compute_kind="pandas", owners=["team:programmers"], ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}, + tags={**StorageKindTagSet(storage_kind=storage_kind)}, ) def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame: """Computes min order KPI""" @@ -73,6 +81,7 @@ def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame: compute_kind="hex", key_prefix="MARKETING", ins={"sku_stats": AssetIn(key_prefix=["ANALYTICS"])}, + tags={**StorageKindTagSet(storage_kind="s3")}, ) def key_product_deepdive(context, sku_stats): """Creates a file for a BI tool based on the current quarters top product, represented as a dynamic partition""" diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index bf47e6d..11116ff 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -6,18 +6,24 @@ AssetCheckSeverity, AssetCheckResult, AssetKey, - build_column_schema_change_checks, BackfillPolicy, Backoff, + build_column_schema_change_checks, + Backoff, DailyPartitionsDefinition, Jitter, MetadataValue, RetryPolicy, ) +from dagster._core.definitions.tags import StorageKindTagSet import pandas as pd - from hooli_data_eng.resources.api import RawDataAPI +from hooli_data_eng.utils.config_utils import get_storage_kind + + +# dynamically determine storage_kind based on environment +storage_kind = get_storage_kind() daily_partitions = DailyPartitionsDefinition( @@ -38,7 +44,8 @@ def _daily_partition_seq(start, end): partitions_def=daily_partitions, metadata={"partition_expr": "created_at"}, backfill_policy=BackfillPolicy.single_run(), - tags={"core_kpis":""} + tags={"core_kpis":"", + **StorageKindTagSet(storage_kind=storage_kind)}, ) def users(context, api: RawDataAPI) -> pd.DataFrame: """A table containing all users data""" @@ -84,7 +91,8 @@ def check_users(context, users: pd.DataFrame): backoff=Backoff.LINEAR, jitter=Jitter.FULL ), - backfill_policy=BackfillPolicy.single_run() + backfill_policy=BackfillPolicy.single_run(), + tags={**StorageKindTagSet(storage_kind=storage_kind)}, ) def orders(context, api: RawDataAPI) -> pd.DataFrame: """A table containing all orders that have been placed""" @@ -104,6 +112,3 @@ def orders(context, api: RawDataAPI) -> pd.DataFrame: AssetKey(["RAW_DATA", "orders"]), AssetKey(["RAW_DATA", "users"]), ]) - - -from dagster_dbt import dbt_assets diff --git a/hooli_data_eng/utils/config_utils.py b/hooli_data_eng/utils/config_utils.py new file mode 100644 index 0000000..15270d7 --- /dev/null +++ b/hooli_data_eng/utils/config_utils.py @@ -0,0 +1,15 @@ +import os + +def get_storage_kind() -> str: + """ + Determine the storage kind based on the environment. + + Returns: + str: The storage kind ('snowflake' or 'duckdb'). + """ + if ( + os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod" or + os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1" + ): + return "snowflake" + return "duckdb" \ No newline at end of file