diff --git a/.github/workflows/deploy-dagster-cloud.yml b/.github/workflows/deploy-dagster-cloud.yml index c1e964e..e7756e5 100644 --- a/.github/workflows/deploy-dagster-cloud.yml +++ b/.github/workflows/deploy-dagster-cloud.yml @@ -153,23 +153,23 @@ jobs: with: command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights" - # Build 'demo_assets' code location - - name: Build and upload Docker image for demo_assets + # Build 'hooli_data_ingest' code location + - name: Build and upload Docker image for hooli_data_ingest if: steps.prerun.outputs.result != 'skip' uses: docker/build-push-action@v5 with: - context: ./hooli-demo-assets + context: ./hooli-data-ingest push: true - tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-demo-assets + tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-hooli-data-ingest cache-from: type=gha cache-to: type=gha,mode=max - - name: Update build session with image tag for demo_assets - id: ci-set-build-output-demo-assets + - name: Update build session with image tag for hooli_data_ingest + id: ci-set-build-output-hooli-data-ingest if: steps.prerun.outputs.result != 'skip' uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: - command: "ci set-build-output --location-name=demo_assets --image-tag=$IMAGE_TAG-demo-assets" + command: "ci set-build-output --location-name=hooli_data_ingest --image-tag=$IMAGE_TAG-hooli-data-ingest" # Build pipes example container - name: Build and upload Docker image for pipes example diff --git a/dagster_cloud.yaml b/dagster_cloud.yaml index dd6dff1..a665d46 100644 --- a/dagster_cloud.yaml +++ b/dagster_cloud.yaml @@ -34,9 +34,9 @@ locations: directory: ./hooli_snowflake_insights registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod - - location_name: demo_assets + - location_name: hooli_data_ingest code_source: - package_name: hooli_demo_assets + package_name: hooli_data_ingest build: - directory: ./hooli-demo-assets + directory: ./hooli-data-ingest registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod diff --git a/hooli-demo-assets/Dockerfile b/hooli-data-ingest/Dockerfile similarity index 100% rename from hooli-demo-assets/Dockerfile rename to hooli-data-ingest/Dockerfile diff --git a/hooli-data-ingest/hooli_data_ingest/__init__.py b/hooli-data-ingest/hooli_data_ingest/__init__.py new file mode 100644 index 0000000..4be0ed3 --- /dev/null +++ b/hooli-data-ingest/hooli_data_ingest/__init__.py @@ -0,0 +1 @@ +from hooli_data_ingest.definitions import defs as defs \ No newline at end of file diff --git a/hooli-demo-assets/hooli_demo_assets/assets/sling.py b/hooli-data-ingest/hooli_data_ingest/assets/sling.py similarity index 85% rename from hooli-demo-assets/hooli_demo_assets/assets/sling.py rename to hooli-data-ingest/hooli_data_ingest/assets/sling.py index 555a516..943a6d8 100644 --- a/hooli-demo-assets/hooli_demo_assets/assets/sling.py +++ b/hooli-data-ingest/hooli_data_ingest/assets/sling.py @@ -1,11 +1,11 @@ -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag from dagster_embedded_elt.sling import ( sling_assets, SlingResource, ) from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator -from hooli_demo_assets.resources import replication_config +from hooli_data_ingest.resources import replication_config class CustomSlingTranslator(DagsterSlingTranslator): @@ -21,7 +21,7 @@ def get_tags(self, stream_definition): storage_kind = self.replication_config.get("target", "DUCKDB") if storage_kind.startswith("SNOWFLAKE"): storage_kind = "SNOWFLAKE" - return {**StorageKindTagSet(storage_kind=storage_kind)} + return {**build_kind_tag(storage_kind)} @sling_assets( diff --git a/hooli-demo-assets/hooli_demo_assets/definitions.py b/hooli-data-ingest/hooli_data_ingest/definitions.py similarity index 66% rename from hooli-demo-assets/hooli_demo_assets/definitions.py rename to hooli-data-ingest/hooli_data_ingest/definitions.py index 32c2360..def2e3e 100644 --- a/hooli-demo-assets/hooli_demo_assets/definitions.py +++ b/hooli-data-ingest/hooli_data_ingest/definitions.py @@ -8,10 +8,10 @@ from dagster._core.definitions.metadata import with_source_code_references from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud -from hooli_demo_assets.assets.sling import my_sling_assets -from hooli_demo_assets.jobs import daily_sling_job -from hooli_demo_assets.resources import sling_resource -from hooli_demo_assets.schedules import daily_sling_assets +from hooli_data_ingest.assets.sling import my_sling_assets +from hooli_data_ingest.jobs import daily_sling_job +from hooli_data_ingest.resources import sling_resource +from hooli_data_ingest.schedules import daily_sling_assets defs = Definitions( @@ -19,7 +19,7 @@ with_source_code_references([my_sling_assets]), file_path_mapping=AnchorBasedFilePathMapping( local_file_anchor=Path(__file__), - file_anchor_path_in_repository="hooli-demo-assets/hooli_demo_assets/definitions.py", + file_anchor_path_in_repository="hooli-data-ingest/hooli_data_ingest/definitions.py", ), ), schedules=[daily_sling_assets], diff --git a/hooli-demo-assets/hooli_demo_assets/jobs/__init__.py b/hooli-data-ingest/hooli_data_ingest/jobs/__init__.py similarity index 100% rename from hooli-demo-assets/hooli_demo_assets/jobs/__init__.py rename to hooli-data-ingest/hooli_data_ingest/jobs/__init__.py diff --git a/hooli-demo-assets/hooli_demo_assets/resources/__init__.py b/hooli-data-ingest/hooli_data_ingest/resources/__init__.py similarity index 93% rename from hooli-demo-assets/hooli_demo_assets/resources/__init__.py rename to hooli-data-ingest/hooli_data_ingest/resources/__init__.py index 187fd4b..d8f33b8 100644 --- a/hooli-demo-assets/hooli_demo_assets/resources/__init__.py +++ b/hooli-data-ingest/hooli_data_ingest/resources/__init__.py @@ -16,10 +16,10 @@ def get_env(): # Paths for local dev current_file_path = Path(__file__) -hooli_demo_root = current_file_path.parent.parent.parent -project_root = hooli_demo_root.parent +hooli_data_ingest_root = current_file_path.parent.parent.parent +project_root = hooli_data_ingest_root.parent DUCKDB_PATH = project_root / "dbt_project" / "example.duckdb" -LOCATIONS_CSV_PATH = f"file://{hooli_demo_root}/locations.csv" +LOCATIONS_CSV_PATH = f"file://{hooli_data_ingest_root}/locations.csv" if get_env() == "LOCAL": diff --git a/hooli-demo-assets/hooli_demo_assets/schedules/__init__.py b/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py similarity index 76% rename from hooli-demo-assets/hooli_demo_assets/schedules/__init__.py rename to hooli-data-ingest/hooli_data_ingest/schedules/__init__.py index 9554a18..8a79746 100644 --- a/hooli-demo-assets/hooli_demo_assets/schedules/__init__.py +++ b/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py @@ -1,5 +1,5 @@ from dagster import ScheduleDefinition -from hooli_demo_assets.jobs import daily_sling_job +from hooli_data_ingest.jobs import daily_sling_job daily_sling_assets = ScheduleDefinition( diff --git a/hooli-demo-assets/locations.csv b/hooli-data-ingest/locations.csv similarity index 100% rename from hooli-demo-assets/locations.csv rename to hooli-data-ingest/locations.csv diff --git a/hooli-demo-assets/setup.py b/hooli-data-ingest/setup.py similarity index 89% rename from hooli-demo-assets/setup.py rename to hooli-data-ingest/setup.py index 12f0d56..df31879 100644 --- a/hooli-demo-assets/setup.py +++ b/hooli-data-ingest/setup.py @@ -1,7 +1,7 @@ from setuptools import find_packages, setup setup( - name="hooli_demo_assets", + name="hooli_data_ingest", packages=find_packages(), install_requires=[ "dagster", diff --git a/hooli-demo-assets/hooli_demo_assets/__init__.py b/hooli-demo-assets/hooli_demo_assets/__init__.py deleted file mode 100644 index 59d0a1b..0000000 --- a/hooli-demo-assets/hooli_demo_assets/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from hooli_demo_assets.definitions import defs as defs \ No newline at end of file diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index a269b97..52628d1 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -8,14 +8,16 @@ Definitions, with_source_code_references, ) -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from pandas import DataFrame, read_html, get_dummies, to_numeric from sklearn.linear_model import LinearRegression as Regression @asset( - compute_kind="Kubernetes", - tags={**StorageKindTagSet(storage_kind="S3")}, + tags={ + **build_kind_tag("Kubernetes"), + **build_kind_tag("S3"), + }, ) def country_stats() -> DataFrame: df = read_html("https://tinyurl.com/mry64ebh", flavor='html5lib')[0] @@ -38,8 +40,10 @@ def check_country_stats(country_stats): return AssetCheckResult(passed=True) @asset( - compute_kind="Kubernetes", - tags={**StorageKindTagSet(storage_kind="S3")}, + tags={ + **build_kind_tag("Kubernetes"), + **build_kind_tag("S3"), + }, ) def change_model(country_stats: DataFrame) -> Regression: data = country_stats.dropna(subset=["pop_change"]) @@ -47,8 +51,10 @@ def change_model(country_stats: DataFrame) -> Regression: return Regression().fit(dummies, data["pop_change"]) @asset( - compute_kind="Kubernetes", - tags={**StorageKindTagSet(storage_kind="S3")}, + tags={ + **build_kind_tag("Kubernetes"), + **build_kind_tag("S3"), + }, ) def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame: result = country_stats.groupby("continent").sum() diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/assets.py b/hooli_batch_enrichment/dagster_batch_enrichment/assets.py index 023b32a..5c3f3f5 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/assets.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/assets.py @@ -1,7 +1,7 @@ import json from dagster import asset, OpExecutionContext ,MetadataValue, DynamicOut, Config, op, DynamicOutput, Out, graph_asset, RetryPolicy, Config -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag import pandas as pd from pydantic import Field import numpy as np @@ -17,8 +17,10 @@ class experimentConfig(Config): ) @asset( - compute_kind="Kubernetes", - tags={**StorageKindTagSet(storage_kind="S3")}, + tags={ + **build_kind_tag("Kubernetes"), + **build_kind_tag("S3"), + }, ) def raw_data( context: OpExecutionContext, @@ -92,8 +94,9 @@ def concat_chunk_list(chunks) -> pd.DataFrame: @graph_asset( tags={ - **StorageKindTagSet(storage_kind="S3"), - }, + **build_kind_tag("Kubernetes"), + **build_kind_tag("S3"), + }, ) def enriched_data(raw_data) -> pd.DataFrame: """Full enrichment process""" diff --git a/hooli_data_eng/assets/forecasting/__init__.py b/hooli_data_eng/assets/forecasting/__init__.py index d5ec2f4..cfe3394 100644 --- a/hooli_data_eng/assets/forecasting/__init__.py +++ b/hooli_data_eng/assets/forecasting/__init__.py @@ -6,7 +6,6 @@ from dagster import ( asset, - AssetKey, AssetIn, MonthlyPartitionsDefinition, Output, @@ -14,11 +13,10 @@ Config, AssetExecutionContext, MaterializeResult, - SourceAsset, ) from dagster_k8s import PipesK8sClient from dagstermill import define_dagstermill_asset -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag from dagster._utils import file_relative_path from dagster_databricks import PipesDatabricksClient from databricks.sdk.service import jobs @@ -64,9 +62,11 @@ class modelHyperParams(Config): @asset( ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])}, - compute_kind="scikitlearn", io_manager_key="model_io_manager", - tags={**StorageKindTagSet(storage_kind="s3")}, + tags={ + **build_kind_tag("scikitlearn"), + **build_kind_tag("s3"), + }, ) def order_forecast_model( context, weekly_order_summary: pd.DataFrame, config: modelHyperParams @@ -97,12 +97,14 @@ def order_forecast_model( "weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"]), "order_forecast_model": AssetIn(), }, - compute_kind="scikitlearn", key_prefix=["forecasting"], io_manager_key="model_io_manager", partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"), - tags={"core_kpis":"", - **StorageKindTagSet(storage_kind=storage_kind)}, + tags={ + "core_kpis":"", + **build_kind_tag("scikitlearn"), + **build_kind_tag(storage_kind), + }, ) def model_stats_by_month( context, @@ -139,9 +141,11 @@ def model_stats_by_month( "weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"]), "order_forecast_model": AssetIn(), }, - compute_kind="pandas", key_prefix=["FORECASTING"], - tags={**StorageKindTagSet(storage_kind=storage_kind)}, + tags={ + **build_kind_tag("pandas"), + **build_kind_tag(storage_kind), + }, ) def predicted_orders( weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float] @@ -165,11 +169,13 @@ def predicted_orders( # for building a databricks cluster @asset( ins={"predicted_orders": AssetIn(key_prefix=["FORECASTING"])}, - compute_kind="pyspark", key_prefix=["FORECASTING"], required_resource_keys={"step_launcher", "pyspark"}, metadata={"resource_constrained_at": 50}, - tags={**StorageKindTagSet(storage_kind="databricks")}, + tags={ + **build_kind_tag("pyspark"), + **build_kind_tag("databricks"), + }, ) def big_orders(context, predicted_orders: pd.DataFrame): """Days where predicted orders surpass our current carrying capacity""" @@ -189,7 +195,7 @@ def big_orders(context, predicted_orders: pd.DataFrame): "order_forecast_model": AssetIn(), }, required_resource_keys={"io_manager"}, - asset_tags={**StorageKindTagSet(storage_kind="S3")}, + asset_tags={**build_kind_tag("S3")}, ) @@ -198,8 +204,10 @@ def big_orders(context, predicted_orders: pd.DataFrame): # or use that upstream Snowflake table, it is used here for illustrative purposes @asset( deps=[predicted_orders], - compute_kind="databricks", - tags={**StorageKindTagSet(storage_kind="databricks")}, + tags={ + **build_kind_tag("pyspark"), + **build_kind_tag("databricks"), + }, ) def databricks_asset( context: AssetExecutionContext, @@ -247,8 +255,10 @@ def databricks_asset( # or use that upstream Snowflake table, it is used here for illustrative purposes @asset( deps=[predicted_orders], - compute_kind="kubernetes", - tags={**StorageKindTagSet(storage_kind="S3")}, + tags={ + **build_kind_tag("kubernetes"), + **build_kind_tag("S3"), + }, ) def k8s_pod_asset( context: AssetExecutionContext, diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index ff53049..a935292 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -16,7 +16,7 @@ ScheduleDefinition, AssetSelection ) -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks import pandas as pd from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind @@ -31,10 +31,12 @@ @asset( key_prefix="MARKETING", automation_condition=AutomationCondition.on_cron('0 0 1-31/2 * *'), - compute_kind="pandas", owners=["team:programmers", "lopp@dagsterlabs.com"], ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}, - tags={**StorageKindTagSet(storage_kind=storage_kind)}, + tags={ + **build_kind_tag("pandas"), + **build_kind_tag(storage_kind), + }, ) def avg_orders( context: AssetExecutionContext, company_perf: pd.DataFrame @@ -57,10 +59,12 @@ def check_avg_orders(context, avg_orders: pd.DataFrame): @asset( key_prefix="MARKETING", - compute_kind="pandas", owners=["team:programmers"], ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}, - tags={**StorageKindTagSet(storage_kind=storage_kind)}, + tags={ + **build_kind_tag("pandas"), + **build_kind_tag(storage_kind), + }, ) def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame: """Computes min order KPI""" @@ -77,10 +81,12 @@ def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame: @asset( partitions_def=product_skus, io_manager_key="model_io_manager", - compute_kind="hex", key_prefix="MARKETING", ins={"sku_stats": AssetIn(key_prefix=["ANALYTICS"])}, - tags={**StorageKindTagSet(storage_kind="s3")}, + tags={ + **build_kind_tag("hex"), + **build_kind_tag("s3"), + }, ) def key_product_deepdive(context, sku_stats): """Creates a file for a BI tool based on the current quarters top product, represented as a dynamic partition""" diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index 9c4262a..ce34d60 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -15,7 +15,7 @@ MetadataValue, RetryPolicy, ) -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag import pandas as pd from hooli_data_eng.resources.api import RawDataAPI @@ -40,12 +40,13 @@ def _daily_partition_seq(start, end): @asset( - compute_kind="api", partitions_def=daily_partitions, metadata={"partition_expr": "created_at"}, backfill_policy=BackfillPolicy.single_run(), tags={"core_kpis":"", - **StorageKindTagSet(storage_kind=storage_kind)}, + **build_kind_tag("api"), + **build_kind_tag(storage_kind), + }, ) def users(context, api: RawDataAPI) -> pd.DataFrame: """A table containing all users data""" @@ -82,7 +83,6 @@ def check_users(context, users: pd.DataFrame): ) @asset( - compute_kind="api", partitions_def=daily_partitions, metadata={"partition_expr": "DT"}, retry_policy=RetryPolicy( @@ -92,7 +92,10 @@ def check_users(context, users: pd.DataFrame): jitter=Jitter.FULL ), backfill_policy=BackfillPolicy.single_run(), - tags={**StorageKindTagSet(storage_kind=storage_kind)}, + tags={ + **build_kind_tag("api"), + **build_kind_tag(storage_kind), + }, ) def orders(context, api: RawDataAPI) -> pd.DataFrame: """A table containing all orders that have been placed"""