Skip to content

Commit

Permalink
Merge pull request #106 from dagster-io/izzy/add_missing_storage_kinds
Browse files Browse the repository at this point in the history
Add missing compute and storage kind tags
  • Loading branch information
izzye84 authored Jul 16, 2024
2 parents 86e197a + fe791da commit 5e24870
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 13 deletions.
9 changes: 9 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/assets/sling.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dagster._core.definitions.tags import StorageKindTagSet
from dagster_embedded_elt.sling import (
sling_assets,
SlingResource,
Expand All @@ -10,9 +11,17 @@
class CustomSlingTranslator(DagsterSlingTranslator):
def __init__(self, target_prefix="RAW_DATA"):
super().__init__(target_prefix=target_prefix)
self.replication_config = replication_config

def get_group_name(self, stream_definition):
return "RAW_DATA"

def get_tags(self, stream_definition):
# derive storage_kind from the target set in the replication_config
storage_kind = self.replication_config.get("target", "DUCKDB")
if storage_kind.startswith("SNOWFLAKE"):
storage_kind = "SNOWFLAKE"
return {**StorageKindTagSet(storage_kind=storage_kind)}


@sling_assets(
Expand Down
16 changes: 13 additions & 3 deletions hooli_basics/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
Definitions,
with_source_code_references,
)
from dagster._core.definitions.tags import StorageKindTagSet
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud
from pandas import DataFrame, read_html, get_dummies, to_numeric
from sklearn.linear_model import LinearRegression as Regression

@asset
@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
)
def country_stats() -> DataFrame:
df = read_html("https://tinyurl.com/mry64ebh", flavor='html5lib')[0]
df.columns = ["country", "continent", "region", "pop_2022", "pop_2023", "pop_change"]
Expand All @@ -25,13 +29,19 @@ def country_stats() -> DataFrame:
def check_country_stats(country_stats):
return AssetCheckResult(success=True)

@asset
@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
)
def change_model(country_stats: DataFrame) -> Regression:
data = country_stats.dropna(subset=["pop_change"])
dummies = get_dummies(data[["continent"]])
return Regression().fit(dummies, data["pop_change"])

@asset
@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
)
def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame:
result = country_stats.groupby("continent").sum()
result["pop_change_factor"] = change_model.coef_
Expand Down
25 changes: 18 additions & 7 deletions hooli_batch_enrichment/dagster_batch_enrichment/assets.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
import json

from dagster import asset, OpExecutionContext ,MetadataValue, DynamicOut, Config, op, DynamicOutput, Out, graph_asset, RetryPolicy, Config
from dagster._core.definitions.tags import StorageKindTagSet
import pandas as pd
from pydantic import Field
import numpy as np

from dagster_batch_enrichment.warehouse import MyWarehouse
from dagster_batch_enrichment.api import EnrichmentAPI
import numpy as np
from pydantic import Field
import pandas as pd
import json


class experimentConfig(Config):
experiment_name: str

@asset
@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
)
def raw_data(
context: OpExecutionContext,
warehouse: MyWarehouse,
config: experimentConfig
config: experimentConfig,
):
""" Placeholder for querying a real data source"""
orders_to_process = warehouse.get_raw_data()
Expand Down Expand Up @@ -80,7 +87,11 @@ def concat_chunk_list(chunks) -> pd.DataFrame:
return pd.concat(chunks)


@graph_asset
@graph_asset(
tags={
**StorageKindTagSet(storage_kind="S3"),
},
)
def enriched_data(raw_data) -> pd.DataFrame:
"""Full enrichment process"""
chunks = split_rows(raw_data)
Expand Down
4 changes: 3 additions & 1 deletion hooli_data_eng/assets/forecasting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from databricks.sdk.service import jobs
from pydantic import Field

from hooli_data_eng.utils.config_utils import get_storage_kind
from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind


# dynamically determine storage_kind based on environment
Expand Down Expand Up @@ -187,6 +187,7 @@ def big_orders(context, predicted_orders: pd.DataFrame):
"order_forecast_model": AssetIn(),
},
required_resource_keys={"io_manager"},
asset_tags={**StorageKindTagSet(storage_kind="S3")},
)


Expand Down Expand Up @@ -245,6 +246,7 @@ def databricks_asset(
@asset(
deps=[predicted_orders],
compute_kind="kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
)
def k8s_pod_asset(
context: AssetExecutionContext,
Expand Down
2 changes: 1 addition & 1 deletion hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import pandas as pd

from hooli_data_eng.assets.dbt_assets import allow_outdated_parents_policy
from hooli_data_eng.utils.config_utils import get_storage_kind
from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind


# dynamically determine storage_kind based on environment
Expand Down
2 changes: 1 addition & 1 deletion hooli_data_eng/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pandas as pd

from hooli_data_eng.resources.api import RawDataAPI
from hooli_data_eng.utils.config_utils import get_storage_kind
from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind


# dynamically determine storage_kind based on environment
Expand Down
File renamed without changes.

0 comments on commit 5e24870

Please sign in to comment.