Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed storage kind tags #126

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,23 @@ jobs:
with:
command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights"

# Build 'demo_assets' code location
- name: Build and upload Docker image for demo_assets
# Build 'hooli_data_ingest' code location
- name: Build and upload Docker image for hooli_data_ingest
if: steps.prerun.outputs.result != 'skip'
uses: docker/build-push-action@v5
with:
context: ./hooli-demo-assets
context: ./hooli-data-ingest
push: true
tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-demo-assets
tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-hooli-data-ingest
cache-from: type=gha
cache-to: type=gha,mode=max

- name: Update build session with image tag for demo_assets
id: ci-set-build-output-demo-assets
- name: Update build session with image tag for hooli_data_ingest
id: ci-set-build-output-hooli-data-ingest
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected]
with:
command: "ci set-build-output --location-name=demo_assets --image-tag=$IMAGE_TAG-demo-assets"
command: "ci set-build-output --location-name=hooli_data_ingest --image-tag=$IMAGE_TAG-hooli-data-ingest"

# Build pipes example container
- name: Build and upload Docker image for pipes example
Expand Down
6 changes: 3 additions & 3 deletions dagster_cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ locations:
directory: ./hooli_snowflake_insights
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod

- location_name: demo_assets
- location_name: hooli_data_ingest
code_source:
package_name: hooli_demo_assets
package_name: hooli_data_ingest
build:
directory: ./hooli-demo-assets
directory: ./hooli-data-ingest
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod
File renamed without changes.
1 change: 1 addition & 0 deletions hooli-data-ingest/hooli_data_ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from hooli_data_ingest.definitions import defs as defs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
from dagster_embedded_elt.sling import (
sling_assets,
SlingResource,
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator

from hooli_demo_assets.resources import replication_config
from hooli_data_ingest.resources import replication_config


class CustomSlingTranslator(DagsterSlingTranslator):
Expand All @@ -21,7 +21,7 @@ def get_tags(self, stream_definition):
storage_kind = self.replication_config.get("target", "DUCKDB")
if storage_kind.startswith("SNOWFLAKE"):
storage_kind = "SNOWFLAKE"
return {**StorageKindTagSet(storage_kind=storage_kind)}
return {**build_kind_tag(storage_kind)}


@sling_assets(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
from dagster._core.definitions.metadata import with_source_code_references
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud

from hooli_demo_assets.assets.sling import my_sling_assets
from hooli_demo_assets.jobs import daily_sling_job
from hooli_demo_assets.resources import sling_resource
from hooli_demo_assets.schedules import daily_sling_assets
from hooli_data_ingest.assets.sling import my_sling_assets
from hooli_data_ingest.jobs import daily_sling_job
from hooli_data_ingest.resources import sling_resource
from hooli_data_ingest.schedules import daily_sling_assets


defs = Definitions(
assets=link_code_references_to_git_if_cloud(
with_source_code_references([my_sling_assets]),
file_path_mapping=AnchorBasedFilePathMapping(
local_file_anchor=Path(__file__),
file_anchor_path_in_repository="hooli-demo-assets/hooli_demo_assets/definitions.py",
file_anchor_path_in_repository="hooli-data-ingest/hooli_data_ingest/definitions.py",
),
),
schedules=[daily_sling_assets],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ def get_env():

# Paths for local dev
current_file_path = Path(__file__)
hooli_demo_root = current_file_path.parent.parent.parent
project_root = hooli_demo_root.parent
hooli_data_ingest_root = current_file_path.parent.parent.parent
project_root = hooli_data_ingest_root.parent
DUCKDB_PATH = project_root / "dbt_project" / "example.duckdb"
LOCATIONS_CSV_PATH = f"file://{hooli_demo_root}/locations.csv"
LOCATIONS_CSV_PATH = f"file://{hooli_data_ingest_root}/locations.csv"


if get_env() == "LOCAL":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dagster import ScheduleDefinition
from hooli_demo_assets.jobs import daily_sling_job
from hooli_data_ingest.jobs import daily_sling_job


daily_sling_assets = ScheduleDefinition(
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion hooli-demo-assets/setup.py → hooli-data-ingest/setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import find_packages, setup

setup(
name="hooli_demo_assets",
name="hooli_data_ingest",
packages=find_packages(),
install_requires=[
"dagster",
Expand Down
1 change: 0 additions & 1 deletion hooli-demo-assets/hooli_demo_assets/__init__.py

This file was deleted.

20 changes: 13 additions & 7 deletions hooli_basics/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
Definitions,
with_source_code_references,
)
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud
from pandas import DataFrame, read_html, get_dummies, to_numeric
from sklearn.linear_model import LinearRegression as Regression

@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
)
def country_stats() -> DataFrame:
df = read_html("https://tinyurl.com/mry64ebh", flavor='html5lib')[0]
Expand All @@ -38,17 +40,21 @@ def check_country_stats(country_stats):
return AssetCheckResult(passed=True)

@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
)
def change_model(country_stats: DataFrame) -> Regression:
data = country_stats.dropna(subset=["pop_change"])
dummies = get_dummies(data[["continent"]])
return Regression().fit(dummies, data["pop_change"])

@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
)
def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame:
result = country_stats.groupby("continent").sum()
Expand Down
13 changes: 8 additions & 5 deletions hooli_batch_enrichment/dagster_batch_enrichment/assets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json

from dagster import asset, OpExecutionContext ,MetadataValue, DynamicOut, Config, op, DynamicOutput, Out, graph_asset, RetryPolicy, Config
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
import pandas as pd
from pydantic import Field
import numpy as np
Expand All @@ -17,8 +17,10 @@ class experimentConfig(Config):
)

@asset(
compute_kind="Kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
tags={
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
)
def raw_data(
context: OpExecutionContext,
Expand Down Expand Up @@ -92,8 +94,9 @@ def concat_chunk_list(chunks) -> pd.DataFrame:

@graph_asset(
tags={
**StorageKindTagSet(storage_kind="S3"),
},
**build_kind_tag("Kubernetes"),
**build_kind_tag("S3"),
},
)
def enriched_data(raw_data) -> pd.DataFrame:
"""Full enrichment process"""
Expand Down
44 changes: 27 additions & 17 deletions hooli_data_eng/assets/forecasting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@

from dagster import (
asset,
AssetKey,
AssetIn,
MonthlyPartitionsDefinition,
Output,
Field,
Config,
AssetExecutionContext,
MaterializeResult,
SourceAsset,
)
from dagster_k8s import PipesK8sClient
from dagstermill import define_dagstermill_asset
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
from dagster._utils import file_relative_path
from dagster_databricks import PipesDatabricksClient
from databricks.sdk.service import jobs
Expand Down Expand Up @@ -64,9 +62,11 @@ class modelHyperParams(Config):

@asset(
ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])},
compute_kind="scikitlearn",
io_manager_key="model_io_manager",
tags={**StorageKindTagSet(storage_kind="s3")},
tags={
**build_kind_tag("scikitlearn"),
**build_kind_tag("s3"),
},
)
def order_forecast_model(
context, weekly_order_summary: pd.DataFrame, config: modelHyperParams
Expand Down Expand Up @@ -97,12 +97,14 @@ def order_forecast_model(
"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"]),
"order_forecast_model": AssetIn(),
},
compute_kind="scikitlearn",
key_prefix=["forecasting"],
io_manager_key="model_io_manager",
partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"),
tags={"core_kpis":"",
**StorageKindTagSet(storage_kind=storage_kind)},
tags={
"core_kpis":"",
**build_kind_tag("scikitlearn"),
**build_kind_tag(storage_kind),
},
)
def model_stats_by_month(
context,
Expand Down Expand Up @@ -139,9 +141,11 @@ def model_stats_by_month(
"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"]),
"order_forecast_model": AssetIn(),
},
compute_kind="pandas",
key_prefix=["FORECASTING"],
tags={**StorageKindTagSet(storage_kind=storage_kind)},
tags={
**build_kind_tag("pandas"),
**build_kind_tag(storage_kind),
},
)
def predicted_orders(
weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]
Expand All @@ -165,11 +169,13 @@ def predicted_orders(
# for building a databricks cluster
@asset(
ins={"predicted_orders": AssetIn(key_prefix=["FORECASTING"])},
compute_kind="pyspark",
key_prefix=["FORECASTING"],
required_resource_keys={"step_launcher", "pyspark"},
metadata={"resource_constrained_at": 50},
tags={**StorageKindTagSet(storage_kind="databricks")},
tags={
**build_kind_tag("pyspark"),
**build_kind_tag("databricks"),
},
)
def big_orders(context, predicted_orders: pd.DataFrame):
"""Days where predicted orders surpass our current carrying capacity"""
Expand All @@ -189,7 +195,7 @@ def big_orders(context, predicted_orders: pd.DataFrame):
"order_forecast_model": AssetIn(),
},
required_resource_keys={"io_manager"},
asset_tags={**StorageKindTagSet(storage_kind="S3")},
asset_tags={**build_kind_tag("S3")},
)


Expand All @@ -198,8 +204,10 @@ def big_orders(context, predicted_orders: pd.DataFrame):
# or use that upstream Snowflake table, it is used here for illustrative purposes
@asset(
deps=[predicted_orders],
compute_kind="databricks",
tags={**StorageKindTagSet(storage_kind="databricks")},
tags={
**build_kind_tag("pyspark"),
**build_kind_tag("databricks"),
},
)
def databricks_asset(
context: AssetExecutionContext,
Expand Down Expand Up @@ -247,8 +255,10 @@ def databricks_asset(
# or use that upstream Snowflake table, it is used here for illustrative purposes
@asset(
deps=[predicted_orders],
compute_kind="kubernetes",
tags={**StorageKindTagSet(storage_kind="S3")},
tags={
**build_kind_tag("kubernetes"),
**build_kind_tag("S3"),
},
)
def k8s_pod_asset(
context: AssetExecutionContext,
Expand Down
20 changes: 13 additions & 7 deletions hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
ScheduleDefinition,
AssetSelection
)
from dagster._core.definitions.tags import StorageKindTagSet
from dagster._core.definitions.tags import build_kind_tag
from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks
import pandas as pd
from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind
Expand All @@ -31,10 +31,12 @@
@asset(
key_prefix="MARKETING",
automation_condition=AutomationCondition.on_cron('0 0 1-31/2 * *'),
compute_kind="pandas",
owners=["team:programmers", "[email protected]"],
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind=storage_kind)},
tags={
**build_kind_tag("pandas"),
**build_kind_tag(storage_kind),
},
)
def avg_orders(
context: AssetExecutionContext, company_perf: pd.DataFrame
Expand All @@ -57,10 +59,12 @@ def check_avg_orders(context, avg_orders: pd.DataFrame):

@asset(
key_prefix="MARKETING",
compute_kind="pandas",
owners=["team:programmers"],
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind=storage_kind)},
tags={
**build_kind_tag("pandas"),
**build_kind_tag(storage_kind),
},
)
def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
"""Computes min order KPI"""
Expand All @@ -77,10 +81,12 @@ def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
@asset(
partitions_def=product_skus,
io_manager_key="model_io_manager",
compute_kind="hex",
key_prefix="MARKETING",
ins={"sku_stats": AssetIn(key_prefix=["ANALYTICS"])},
tags={**StorageKindTagSet(storage_kind="s3")},
tags={
**build_kind_tag("hex"),
**build_kind_tag("s3"),
},
)
def key_product_deepdive(context, sku_stats):
"""Creates a file for a BI tool based on the current quarters top product, represented as a dynamic partition"""
Expand Down
Loading
Loading