Skip to content

Commit

Permalink
Merge pull request #95 from dagster-io/izzy/add_storage_kind_tags
Browse files Browse the repository at this point in the history
Add storage_kind tag
  • Loading branch information
izzye84 authored Jun 7, 2024
2 parents 4fde4cd + 7658c22 commit 09f5a45
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 14 deletions.
18 changes: 15 additions & 3 deletions hooli_data_eng/assets/forecasting/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Any, Tuple
from dagster_k8s import PipesK8sClient

import numpy as np
import pandas as pd
Expand All @@ -15,12 +14,20 @@
AssetExecutionContext,
MaterializeResult,
)
from dagster_k8s import PipesK8sClient
from dagstermill import define_dagstermill_asset
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._utils import file_relative_path
from dagster_databricks import PipesDatabricksClient
from databricks.sdk.service import jobs
from pydantic import Field

from hooli_data_eng.utils.config_utils import get_storage_kind


# dynamically determine storage_kind based on environment
storage_kind = get_storage_kind()


def model_func(x, a, b):
return a * np.exp(b * (x / 10**18 - 1.6095))
Expand Down Expand Up @@ -57,6 +64,7 @@ class modelHyperParams(Config):
ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])},
compute_kind="scikitlearn",
io_manager_key="model_io_manager",
tags={**StorageKindTagSet(storage_kind="s3")},
)
def order_forecast_model(
context, weekly_order_summary: pd.DataFrame, config: modelHyperParams
Expand Down Expand Up @@ -91,8 +99,9 @@ def order_forecast_model(
key_prefix=["forecasting"],
io_manager_key="model_io_manager",
partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"),
tags={"core_kpis":""}
)
tags={"core_kpis":"",
**StorageKindTagSet(storage_kind=storage_kind)},
)
def model_stats_by_month(
context,
weekly_order_summary: pd.DataFrame,
Expand Down Expand Up @@ -130,6 +139,7 @@ def model_stats_by_month(
},
compute_kind="pandas",
key_prefix=["FORECASTING"],
tags={**StorageKindTagSet(storage_kind=storage_kind)},
)
def predicted_orders(
weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]
Expand Down Expand Up @@ -157,6 +167,7 @@ def predicted_orders(
key_prefix=["FORECASTING"],
required_resource_keys={"step_launcher", "pyspark"},
metadata={"resource_constrained_at": 50},
tags={**StorageKindTagSet(storage_kind="databricks")},
)
def big_orders(context, predicted_orders: pd.DataFrame):
"""Days where predicted orders surpass our current carrying capacity"""
Expand Down Expand Up @@ -185,6 +196,7 @@ def big_orders(context, predicted_orders: pd.DataFrame):
@asset(
deps=[predicted_orders],
compute_kind="databricks",
tags={**StorageKindTagSet(storage_kind="databricks")},
)
def databricks_asset(
context: AssetExecutionContext,
Expand Down
17 changes: 13 additions & 4 deletions hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime

from dagster import (
asset,
build_last_update_freshness_checks,
Expand All @@ -9,16 +11,20 @@
AssetCheckResult,
asset_check,
AssetKey,
FreshnessPolicy,
define_asset_job,
ScheduleDefinition,
AssetSelection
)
import pandas as pd
import datetime
from dagster._core.definitions.tags import StorageKindTagSet
from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks
import pandas as pd

from hooli_data_eng.assets.dbt_assets import allow_outdated_parents_policy
from hooli_data_eng.utils.config_utils import get_storage_kind


# dynamically determine storage_kind based on environment
storage_kind = get_storage_kind()


# These assets take data from a SQL table managed by
Expand All @@ -29,6 +35,7 @@
compute_kind="pandas",
owners=["team:programmers", "[email protected]"],
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind=storage_kind)},
)
def avg_orders(
context: AssetExecutionContext, company_perf: pd.DataFrame
Expand All @@ -51,9 +58,10 @@ def check_avg_orders(context, avg_orders: pd.DataFrame):

@asset(
key_prefix="MARKETING",
compute_kind="snowflake",
compute_kind="pandas",
owners=["team:programmers"],
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind=storage_kind)},
)
def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
"""Computes min order KPI"""
Expand All @@ -73,6 +81,7 @@ def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
compute_kind="hex",
key_prefix="MARKETING",
ins={"sku_stats": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind="s3")},
)
def key_product_deepdive(context, sku_stats):
"""Creates a file for a BI tool based on the current quarters top product, represented as a dynamic partition"""
Expand Down
19 changes: 12 additions & 7 deletions hooli_data_eng/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@
AssetCheckSeverity,
AssetCheckResult,
AssetKey,
build_column_schema_change_checks,
BackfillPolicy,
Backoff,
build_column_schema_change_checks,
Backoff,
DailyPartitionsDefinition,
Jitter,
MetadataValue,
RetryPolicy,
)
from dagster._core.definitions.tags import StorageKindTagSet
import pandas as pd


from hooli_data_eng.resources.api import RawDataAPI
from hooli_data_eng.utils.config_utils import get_storage_kind


# dynamically determine storage_kind based on environment
storage_kind = get_storage_kind()


daily_partitions = DailyPartitionsDefinition(
Expand All @@ -38,7 +44,8 @@ def _daily_partition_seq(start, end):
partitions_def=daily_partitions,
metadata={"partition_expr": "created_at"},
backfill_policy=BackfillPolicy.single_run(),
tags={"core_kpis":""}
tags={"core_kpis":"",
**StorageKindTagSet(storage_kind=storage_kind)},
)
def users(context, api: RawDataAPI) -> pd.DataFrame:
"""A table containing all users data"""
Expand Down Expand Up @@ -84,7 +91,8 @@ def check_users(context, users: pd.DataFrame):
backoff=Backoff.LINEAR,
jitter=Jitter.FULL
),
backfill_policy=BackfillPolicy.single_run()
backfill_policy=BackfillPolicy.single_run(),
tags={**StorageKindTagSet(storage_kind=storage_kind)},
)
def orders(context, api: RawDataAPI) -> pd.DataFrame:
"""A table containing all orders that have been placed"""
Expand All @@ -104,6 +112,3 @@ def orders(context, api: RawDataAPI) -> pd.DataFrame:
AssetKey(["RAW_DATA", "orders"]),
AssetKey(["RAW_DATA", "users"]),
])


from dagster_dbt import dbt_assets
15 changes: 15 additions & 0 deletions hooli_data_eng/utils/config_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os

def get_storage_kind() -> str:
"""
Determine the storage kind based on the environment.
Returns:
str: The storage kind ('snowflake' or 'duckdb').
"""
if (
os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod" or
os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1"
):
return "snowflake"
return "duckdb"

0 comments on commit 09f5a45

Please sign in to comment.