Skip to content

Commit

Permalink
Merge pull request #126 from dagster-io/izzy/add_kinds
Browse files Browse the repository at this point in the history
Fixed storage kind tags
  • Loading branch information
izzye84 committed Sep 25, 2024
2 parents 9c2a6e8 + 9f5e64b commit 7cafaba
Show file tree
Hide file tree
Showing 17 changed files with 93 additions and 65 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,23 @@ jobs:
with:
command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights"

# Build 'demo_assets' code location
- name: Build and upload Docker image for demo_assets
# Build 'hooli_data_ingest' code location
- name: Build and upload Docker image for hooli_data_ingest
if: steps.prerun.outputs.result != 'skip'
uses: docker/build-push-action@v5
with:
context: ./hooli-demo-assets
context: ./hooli-data-ingest
push: true
tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-demo-assets
tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-hooli-data-ingest
cache-from: type=gha
cache-to: type=gha,mode=max

- name: Update build session with image tag for demo_assets
id: ci-set-build-output-demo-assets
- name: Update build session with image tag for hooli_data_ingest
id: ci-set-build-output-hooli-data-ingest
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected]
with:
command: "ci set-build-output --location-name=demo_assets --image-tag=$IMAGE_TAG-demo-assets"
command: "ci set-build-output --location-name=hooli_data_ingest --image-tag=$IMAGE_TAG-hooli-data-ingest"

# Build pipes example container
- name: Build and upload Docker image for pipes example
Expand Down
6 changes: 3 additions & 3 deletions dagster_cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ locations:
directory: ./hooli_snowflake_insights
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod

- location_name: demo_assets
- location_name: hooli_data_ingest
code_source:
package_name: hooli_demo_assets
package_name: hooli_data_ingest
build:
directory: ./hooli-demo-assets
directory: ./hooli-data-ingest
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod
File renamed without changes.
1 change: 1 addition & 0 deletions hooli-data-ingest/hooli_data_ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from hooli_data_ingest.definitions import defs as defs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
from dagster_embedded_elt.sling import (
sling_assets,
SlingResource,
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator

from hooli_demo_assets.resources import replication_config
from hooli_data_ingest.resources import replication_config


class CustomSlingTranslator(DagsterSlingTranslator):
Expand All @@ -21,7 +21,7 @@ def get_tags(self, stream_definition):
storage_kind = self.replication_config.get("target", "DUCKDB")
if storage_kind.startswith("SNOWFLAKE"):
storage_kind = "SNOWFLAKE"
return {**StorageKindTagSet(storage_kind=storage_kind)}
return {**build_kind_tag(storage_kind)}


@sling_assets(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
from dagster._core.definitions.metadata import with_source_code_references
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud

from hooli_demo_assets.assets.sling import my_sling_assets
from hooli_demo_assets.jobs import daily_sling_job
from hooli_demo_assets.resources import sling_resource
from hooli_demo_assets.schedules import daily_sling_assets
from hooli_data_ingest.assets.sling import my_sling_assets
from hooli_data_ingest.jobs import daily_sling_job
from hooli_data_ingest.resources import sling_resource
from hooli_data_ingest.schedules import daily_sling_assets


defs = Definitions(
assets=link_code_references_to_git_if_cloud(
with_source_code_references([my_sling_assets]),
file_path_mapping=AnchorBasedFilePathMapping(
local_file_anchor=Path(__file__),
file_anchor_path_in_repository="hooli-demo-assets/hooli_demo_assets/definitions.py",
file_anchor_path_in_repository="hooli-data-ingest/hooli_data_ingest/definitions.py",
),
),
schedules=[daily_sling_assets],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ def get_env():

# Paths for local dev
current_file_path = Path(__file__)
hooli_demo_root = current_file_path.parent.parent.parent
project_root = hooli_demo_root.parent
hooli_data_ingest_root = current_file_path.parent.parent.parent
project_root = hooli_data_ingest_root.parent
DUCKDB_PATH = project_root / "dbt_project" / "example.duckdb"
LOCATIONS_CSV_PATH = f"file://{hooli_demo_root}/locations.csv"
LOCATIONS_CSV_PATH = f"file://{hooli_data_ingest_root}/locations.csv"


if get_env() == "LOCAL":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dagster import ScheduleDefinition
from hooli_demo_assets.jobs import daily_sling_job
from hooli_data_ingest.jobs import daily_sling_job


daily_sling_assets = ScheduleDefinition(
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion hooli-demo-assets/setup.py → hooli-data-ingest/setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import find_packages, setup

setup(
name="hooli_demo_assets",
name="hooli_data_ingest",
packages=find_packages(),
install_requires=[
"dagster",
Expand Down
1 change: 0 additions & 1 deletion hooli-demo-assets/hooli_demo_assets/__init__.py

This file was deleted.

20 changes: 13 additions & 7 deletions hooli_basics/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
Definitions,
with_source_code_references,
)
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud
from pandas import DataFrame, read_html, get_dummies, to_numeric
from sklearn.linear_model import LinearRegression as Regression

@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
)
def country_stats() -> DataFrame:
df = read_html("https://tinyurl.com/mry64ebh", flavor='html5lib')[0]
Expand All @@ -38,17 +40,21 @@ def check_country_stats(country_stats):
return AssetCheckResult(passed=True)

@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
)
def change_model(country_stats: DataFrame) -> Regression:
data = country_stats.dropna(subset=["pop_change"])
dummies = get_dummies(data[["continent"]])
return Regression().fit(dummies, data["pop_change"])

@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
)
def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame:
result = country_stats.groupby("continent").sum()
Expand Down
13 changes: 8 additions & 5 deletions hooli_batch_enrichment/dagster_batch_enrichment/assets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json

from dagster import asset, OpExecutionContext ,MetadataValue, DynamicOut, Config, op, DynamicOutput, Out, graph_asset, RetryPolicy, Config
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
import pandas as pd
from pydantic import Field
import numpy as np
Expand All @@ -17,8 +17,10 @@ class experimentConfig(Config):
)

@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
)
def raw_data(
context: OpExecutionContext,
Expand Down Expand Up @@ -92,8 +94,9 @@ def concat_chunk_list(chunks) -> pd.DataFrame:

@graph_asset(
tags={
**StorageKindTagSet(storage_kind="S3"),
},
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
)
def enriched_data(raw_data) -> pd.DataFrame:
"""Full enrichment process"""
Expand Down
44 changes: 27 additions & 17 deletions hooli_data_eng/assets/forecasting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@

from dagster import (
asset,
AssetKey,
AssetIn,
MonthlyPartitionsDefinition,
Output,
Field,
Config,
AssetExecutionContext,
MaterializeResult,
SourceAsset,
)
from dagster_k8s import PipesK8sClient
from dagstermill import define_dagstermill_asset
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
from dagster._utils import file_relative_path
from dagster_databricks import PipesDatabricksClient
from databricks.sdk.service import jobs
Expand Down Expand Up @@ -64,9 +62,11 @@ class modelHyperParams(Config):

@asset(
ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])},
compute_kind="scikitlearn",
io_manager_key="model_io_manager",
tags={**StorageKindTagSet(storage_kind="s3")},
tags={
**build_kind_tag("scikitlearn"),
**build_kind_tag("s3"),
},
)
def order_forecast_model(
context, weekly_order_summary: pd.DataFrame, config: modelHyperParams
Expand Down Expand Up @@ -97,12 +97,14 @@ def order_forecast_model(
"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"]),
"order_forecast_model": AssetIn(),
},
compute_kind="scikitlearn",
key_prefix=["forecasting"],
io_manager_key="model_io_manager",
partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"),
tags={"core_kpis":"",
**StorageKindTagSet(storage_kind=storage_kind)},
tags={
"core_kpis":"",
**build_kind_tag("scikitlearn"),
**build_kind_tag(storage_kind),
},
)
def model_stats_by_month(
context,
Expand Down Expand Up @@ -139,9 +141,11 @@ def model_stats_by_month(
"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"]),
"order_forecast_model": AssetIn(),
},
compute_kind="pandas",
key_prefix=["FORECASTING"],
tags={**StorageKindTagSet(storage_kind=storage_kind)},
tags={
**build_kind_tag("pandas"),
**build_kind_tag(storage_kind),
},
)
def predicted_orders(
weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]
Expand All @@ -165,11 +169,13 @@ def predicted_orders(
# for building a databricks cluster
@asset(
ins={"predicted_orders": AssetIn(key_prefix=["FORECASTING"])},
compute_kind="pyspark",
key_prefix=["FORECASTING"],
required_resource_keys={"step_launcher", "pyspark"},
metadata={"resource_constrained_at": 50},
tags={**StorageKindTagSet(storage_kind="databricks")},
tags={
**build_kind_tag("pyspark"),
**build_kind_tag("databricks"),
},
)
def big_orders(context, predicted_orders: pd.DataFrame):
"""Days where predicted orders surpass our current carrying capacity"""
Expand All @@ -189,7 +195,7 @@ def big_orders(context, predicted_orders: pd.DataFrame):
"order_forecast_model": AssetIn(),
},
required_resource_keys={"io_manager"},
asset_tags={**StorageKindTagSet(storage_kind="S3")},
asset_tags={**build_kind_tag("S3")},
)


Expand All @@ -198,8 +204,10 @@ def big_orders(context, predicted_orders: pd.DataFrame):
# or use that upstream Snowflake table, it is used here for illustrative purposes
@asset(
deps=[predicted_orders],
compute_kind="databricks",
tags={**StorageKindTagSet(storage_kind="databricks")},
tags={
**build_kind_tag("pyspark"),
**build_kind_tag("databricks"),
},
)
def databricks_asset(
context: AssetExecutionContext,
Expand Down Expand Up @@ -247,8 +255,10 @@ def databricks_asset(
# or use that upstream Snowflake table, it is used here for illustrative purposes
@asset(
deps=[predicted_orders],
compute_kind="kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
tags={
**build_kind_tag("kubernetes"),
**build_kind_tag("S3"),
},
)
def k8s_pod_asset(
context: AssetExecutionContext,
Expand Down
20 changes: 13 additions & 7 deletions hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
ScheduleDefinition,
AssetSelection
)
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks
import pandas as pd
from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind
Expand All @@ -31,10 +31,12 @@
@asset(
key_prefix="MARKETING",
automation_condition=AutomationCondition.on_cron('0 0 1-31/2 * *'),
compute_kind="pandas",
owners=["team:programmers", "[email protected]"],
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind=storage_kind)},
tags={
**build_kind_tag("pandas"),
**build_kind_tag(storage_kind),
},
)
def avg_orders(
context: AssetExecutionContext, company_perf: pd.DataFrame
Expand All @@ -57,10 +59,12 @@ def check_avg_orders(context, avg_orders: pd.DataFrame):

@asset(
key_prefix="MARKETING",
compute_kind="pandas",
owners=["team:programmers"],
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind=storage_kind)},
tags={
**build_kind_tag("pandas"),
**build_kind_tag(storage_kind),
},
)
def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
"""Computes min order KPI"""
Expand All @@ -77,10 +81,12 @@ def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
@asset(
partitions_def=product_skus,
io_manager_key="model_io_manager",
compute_kind="hex",
key_prefix="MARKETING",
ins={"sku_stats": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind="s3")},
tags={
**build_kind_tag("hex"),
**build_kind_tag("s3"),
},
)
def key_product_deepdive(context, sku_stats):
"""Creates a file for a BI tool based on the current quarters top product, represented as a dynamic partition"""
Expand Down
Loading

0 comments on commit 7cafaba

Please sign in to comment.