Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add storage_kind tag #95

Merged
merged 3 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions hooli_data_eng/assets/forecasting/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Any, Tuple
from dagster_k8s import PipesK8sClient

import numpy as np
import pandas as pd
Expand All @@ -15,12 +14,20 @@
AssetExecutionContext,
MaterializeResult,
)
from dagster_k8s import PipesK8sClient
from dagstermill import define_dagstermill_asset
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._utils import file_relative_path
from dagster_databricks import PipesDatabricksClient
from databricks.sdk.service import jobs
from pydantic import Field

from hooli_data_eng.utils.config_utils import get_storage_kind


# dynamically determine storage_kind based on environment
storage_kind = get_storage_kind()


def model_func(x, a, b):
return a * np.exp(b * (x / 10**18 - 1.6095))
Expand Down Expand Up @@ -57,6 +64,7 @@ class modelHyperParams(Config):
ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])},
compute_kind="scikitlearn",
io_manager_key="model_io_manager",
tags={**StorageKindTagSet(storage_kind="s3")},
)
def order_forecast_model(
context, weekly_order_summary: pd.DataFrame, config: modelHyperParams
Expand Down Expand Up @@ -91,8 +99,9 @@ def order_forecast_model(
key_prefix=["forecasting"],
io_manager_key="model_io_manager",
partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"),
tags={"core_kpis":""}
)
tags={"core_kpis":"",
**StorageKindTagSet(storage_kind=storage_kind)},
)
def model_stats_by_month(
context,
weekly_order_summary: pd.DataFrame,
Expand Down Expand Up @@ -130,6 +139,7 @@ def model_stats_by_month(
},
compute_kind="pandas",
key_prefix=["FORECASTING"],
tags={**StorageKindTagSet(storage_kind=storage_kind)},
)
def predicted_orders(
weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]
Expand Down Expand Up @@ -157,6 +167,7 @@ def predicted_orders(
key_prefix=["FORECASTING"],
required_resource_keys={"step_launcher", "pyspark"},
metadata={"resource_constrained_at": 50},
tags={**StorageKindTagSet(storage_kind="databricks")},
)
def big_orders(context, predicted_orders: pd.DataFrame):
"""Days where predicted orders surpass our current carrying capacity"""
Expand Down Expand Up @@ -185,6 +196,7 @@ def big_orders(context, predicted_orders: pd.DataFrame):
@asset(
deps=[predicted_orders],
compute_kind="databricks",
tags={**StorageKindTagSet(storage_kind="databricks")},
)
def databricks_asset(
context: AssetExecutionContext,
Expand Down
17 changes: 13 additions & 4 deletions hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime

from dagster import (
asset,
build_last_update_freshness_checks,
Expand All @@ -9,16 +11,20 @@
AssetCheckResult,
asset_check,
AssetKey,
FreshnessPolicy,
define_asset_job,
ScheduleDefinition,
AssetSelection
)
import pandas as pd
import datetime
from dagster._core.definitions.tags import StorageKindTagSet
from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks
import pandas as pd

from hooli_data_eng.assets.dbt_assets import allow_outdated_parents_policy
from hooli_data_eng.utils.config_utils import get_storage_kind


# dynamically determine storage_kind based on environment
storage_kind = get_storage_kind()


# These assets take data from a SQL table managed by
Expand All @@ -29,6 +35,7 @@
compute_kind="pandas",
owners=["team:programmers", "[email protected]"],
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind=storage_kind)},
)
def avg_orders(
context: AssetExecutionContext, company_perf: pd.DataFrame
Expand All @@ -51,9 +58,10 @@ def check_avg_orders(context, avg_orders: pd.DataFrame):

@asset(
key_prefix="MARKETING",
compute_kind="snowflake",
compute_kind="pandas",
owners=["team:programmers"],
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind=storage_kind)},
)
def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
"""Computes min order KPI"""
Expand All @@ -73,6 +81,7 @@ def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
compute_kind="hex",
key_prefix="MARKETING",
ins={"sku_stats": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind="s3")},
)
def key_product_deepdive(context, sku_stats):
"""Creates a file for a BI tool based on the current quarters top product, represented as a dynamic partition"""
Expand Down
19 changes: 12 additions & 7 deletions hooli_data_eng/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@
AssetCheckSeverity,
AssetCheckResult,
AssetKey,
build_column_schema_change_checks,
BackfillPolicy,
Backoff,
build_column_schema_change_checks,
Backoff,
DailyPartitionsDefinition,
Jitter,
MetadataValue,
RetryPolicy,
)
from dagster._core.definitions.tags import StorageKindTagSet
import pandas as pd


from hooli_data_eng.resources.api import RawDataAPI
from hooli_data_eng.utils.config_utils import get_storage_kind


# dynamically determine storage_kind based on environment
storage_kind = get_storage_kind()


daily_partitions = DailyPartitionsDefinition(
Expand All @@ -38,7 +44,8 @@ def _daily_partition_seq(start, end):
partitions_def=daily_partitions,
metadata={"partition_expr": "created_at"},
backfill_policy=BackfillPolicy.single_run(),
tags={"core_kpis":""}
tags={"core_kpis":"",
**StorageKindTagSet(storage_kind=storage_kind)},
)
def users(context, api: RawDataAPI) -> pd.DataFrame:
"""A table containing all users data"""
Expand Down Expand Up @@ -84,7 +91,8 @@ def check_users(context, users: pd.DataFrame):
backoff=Backoff.LINEAR,
jitter=Jitter.FULL
),
backfill_policy=BackfillPolicy.single_run()
backfill_policy=BackfillPolicy.single_run(),
tags={**StorageKindTagSet(storage_kind=storage_kind)},
)
def orders(context, api: RawDataAPI) -> pd.DataFrame:
"""A table containing all orders that have been placed"""
Expand All @@ -104,6 +112,3 @@ def orders(context, api: RawDataAPI) -> pd.DataFrame:
AssetKey(["RAW_DATA", "orders"]),
AssetKey(["RAW_DATA", "users"]),
])


from dagster_dbt import dbt_assets
15 changes: 15 additions & 0 deletions hooli_data_eng/utils/config_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os

def get_storage_kind() -> str:
"""
Determine the storage kind based on the environment.

Returns:
str: The storage kind ('snowflake' or 'duckdb').
"""
if (
os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod" or
os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1"
):
return "snowflake"
return "duckdb"
Loading