From f836f142f49e7ce1f19bd7d51a71437fec98a5e1 Mon Sep 17 00:00:00 2001 From: izzy Date: Mon, 23 Sep 2024 22:21:39 -0600 Subject: [PATCH 1/4] replaced StorageKindTagSet with build_kind_tag --- .../hooli_demo_assets/assets/sling.py | 4 ++-- hooli_basics/definitions.py | 8 ++++---- .../dagster_batch_enrichment/assets.py | 6 +++--- hooli_data_eng/assets/forecasting/__init__.py | 16 ++++++++-------- hooli_data_eng/assets/marketing/__init__.py | 8 ++++---- hooli_data_eng/assets/raw_data/__init__.py | 6 +++--- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/hooli-demo-assets/hooli_demo_assets/assets/sling.py b/hooli-demo-assets/hooli_demo_assets/assets/sling.py index 555a516b..acd1d00c 100644 --- a/hooli-demo-assets/hooli_demo_assets/assets/sling.py +++ b/hooli-demo-assets/hooli_demo_assets/assets/sling.py @@ -1,4 +1,4 @@ -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag from dagster_embedded_elt.sling import ( sling_assets, SlingResource, @@ -21,7 +21,7 @@ def get_tags(self, stream_definition): storage_kind = self.replication_config.get("target", "DUCKDB") if storage_kind.startswith("SNOWFLAKE"): storage_kind = "SNOWFLAKE" - return {**StorageKindTagSet(storage_kind=storage_kind)} + return {**build_kind_tag(storage_kind)} @sling_assets( diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index a269b972..2d694646 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -8,14 +8,14 @@ Definitions, with_source_code_references, ) -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud from pandas import DataFrame, read_html, get_dummies, to_numeric from sklearn.linear_model import LinearRegression as Regression @asset( compute_kind="Kubernetes", - tags={**StorageKindTagSet(storage_kind="S3")}, + tags={**build_kind_tag("S3")}, ) def country_stats() -> DataFrame: df = read_html("https://tinyurl.com/mry64ebh", flavor='html5lib')[0] @@ -39,7 +39,7 @@ def check_country_stats(country_stats): @asset( compute_kind="Kubernetes", - tags={**StorageKindTagSet(storage_kind="S3")}, + tags={**build_kind_tag("S3")}, ) def change_model(country_stats: DataFrame) -> Regression: data = country_stats.dropna(subset=["pop_change"]) @@ -48,7 +48,7 @@ def change_model(country_stats: DataFrame) -> Regression: @asset( compute_kind="Kubernetes", - tags={**StorageKindTagSet(storage_kind="S3")}, + tags={**build_kind_tag("S3")}, ) def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame: result = country_stats.groupby("continent").sum() diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/assets.py b/hooli_batch_enrichment/dagster_batch_enrichment/assets.py index 023b32a2..48c3da29 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/assets.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/assets.py @@ -1,7 +1,7 @@ import json from dagster import asset, OpExecutionContext ,MetadataValue, DynamicOut, Config, op, DynamicOutput, Out, graph_asset, RetryPolicy, Config -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag import pandas as pd from pydantic import Field import numpy as np @@ -18,7 +18,7 @@ class experimentConfig(Config): @asset( compute_kind="Kubernetes", - tags={**StorageKindTagSet(storage_kind="S3")}, + tags={**build_kind_tag("S3")}, ) def raw_data( context: OpExecutionContext, @@ -92,7 +92,7 @@ def concat_chunk_list(chunks) -> pd.DataFrame: @graph_asset( tags={ - **StorageKindTagSet(storage_kind="S3"), + **build_kind_tag("S3"), }, ) def enriched_data(raw_data) -> pd.DataFrame: diff --git a/hooli_data_eng/assets/forecasting/__init__.py b/hooli_data_eng/assets/forecasting/__init__.py index d5ec2f49..5775bea6 100644 --- a/hooli_data_eng/assets/forecasting/__init__.py +++ b/hooli_data_eng/assets/forecasting/__init__.py @@ -18,7 +18,7 @@ ) from dagster_k8s import PipesK8sClient from dagstermill import define_dagstermill_asset -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag from dagster._utils import file_relative_path from dagster_databricks import PipesDatabricksClient from databricks.sdk.service import jobs @@ -66,7 +66,7 @@ class modelHyperParams(Config): ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])}, compute_kind="scikitlearn", io_manager_key="model_io_manager", - tags={**StorageKindTagSet(storage_kind="s3")}, + tags={**build_kind_tag("s3")}, ) def order_forecast_model( context, weekly_order_summary: pd.DataFrame, config: modelHyperParams @@ -102,7 +102,7 @@ def order_forecast_model( io_manager_key="model_io_manager", partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"), tags={"core_kpis":"", - **StorageKindTagSet(storage_kind=storage_kind)}, + **build_kind_tag(storage_kind)}, ) def model_stats_by_month( context, @@ -141,7 +141,7 @@ def model_stats_by_month( }, compute_kind="pandas", key_prefix=["FORECASTING"], - tags={**StorageKindTagSet(storage_kind=storage_kind)}, + tags={**build_kind_tag(storage_kind)}, ) def predicted_orders( weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float] @@ -169,7 +169,7 @@ def predicted_orders( key_prefix=["FORECASTING"], required_resource_keys={"step_launcher", "pyspark"}, metadata={"resource_constrained_at": 50}, - tags={**StorageKindTagSet(storage_kind="databricks")}, + tags={**build_kind_tag("databricks")}, ) def big_orders(context, predicted_orders: pd.DataFrame): """Days where predicted orders surpass our current carrying capacity""" @@ -189,7 +189,7 @@ def big_orders(context, predicted_orders: pd.DataFrame): "order_forecast_model": AssetIn(), }, required_resource_keys={"io_manager"}, - asset_tags={**StorageKindTagSet(storage_kind="S3")}, + asset_tags={**build_kind_tag("S3")}, ) @@ -199,7 +199,7 @@ def big_orders(context, predicted_orders: pd.DataFrame): @asset( deps=[predicted_orders], compute_kind="databricks", - tags={**StorageKindTagSet(storage_kind="databricks")}, + tags={**build_kind_tag("databricks")}, ) def databricks_asset( context: AssetExecutionContext, @@ -248,7 +248,7 @@ def databricks_asset( @asset( deps=[predicted_orders], compute_kind="kubernetes", - tags={**StorageKindTagSet(storage_kind="S3")}, + tags={**build_kind_tag("S3")}, ) def k8s_pod_asset( context: AssetExecutionContext, diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index ff53049c..914a13ab 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -16,7 +16,7 @@ ScheduleDefinition, AssetSelection ) -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks import pandas as pd from hooli_data_eng.utils.storage_kind_helpers import get_storage_kind @@ -34,7 +34,7 @@ compute_kind="pandas", owners=["team:programmers", "lopp@dagsterlabs.com"], ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}, - tags={**StorageKindTagSet(storage_kind=storage_kind)}, + tags={**build_kind_tag(storage_kind)}, ) def avg_orders( context: AssetExecutionContext, company_perf: pd.DataFrame @@ -60,7 +60,7 @@ def check_avg_orders(context, avg_orders: pd.DataFrame): compute_kind="pandas", owners=["team:programmers"], ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}, - tags={**StorageKindTagSet(storage_kind=storage_kind)}, + tags={**build_kind_tag(storage_kind)}, ) def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame: """Computes min order KPI""" @@ -80,7 +80,7 @@ def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame: compute_kind="hex", key_prefix="MARKETING", ins={"sku_stats": AssetIn(key_prefix=["ANALYTICS"])}, - tags={**StorageKindTagSet(storage_kind="s3")}, + tags={**build_kind_tag("s3")}, ) def key_product_deepdive(context, sku_stats): """Creates a file for a BI tool based on the current quarters top product, represented as a dynamic partition""" diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index 9c4262a2..d8b7c76b 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -15,7 +15,7 @@ MetadataValue, RetryPolicy, ) -from dagster._core.definitions.tags import StorageKindTagSet +from dagster._core.definitions.tags import build_kind_tag import pandas as pd from hooli_data_eng.resources.api import RawDataAPI @@ -45,7 +45,7 @@ def _daily_partition_seq(start, end): metadata={"partition_expr": "created_at"}, backfill_policy=BackfillPolicy.single_run(), tags={"core_kpis":"", - **StorageKindTagSet(storage_kind=storage_kind)}, + **build_kind_tag(storage_kind)}, ) def users(context, api: RawDataAPI) -> pd.DataFrame: """A table containing all users data""" @@ -92,7 +92,7 @@ def check_users(context, users: pd.DataFrame): jitter=Jitter.FULL ), backfill_policy=BackfillPolicy.single_run(), - tags={**StorageKindTagSet(storage_kind=storage_kind)}, + tags={**build_kind_tag(storage_kind)}, ) def orders(context, api: RawDataAPI) -> pd.DataFrame: """A table containing all orders that have been placed""" From ab18aaa181a6b6e3d557285db3c62168f06bb4bb Mon Sep 17 00:00:00 2001 From: izzy Date: Tue, 24 Sep 2024 13:50:32 -0600 Subject: [PATCH 2/4] removed compute_kind parameter --- hooli_basics/definitions.py | 18 ++++++--- .../dagster_batch_enrichment/assets.py | 9 +++-- hooli_data_eng/assets/forecasting/__init__.py | 40 ++++++++++++------- hooli_data_eng/assets/marketing/__init__.py | 18 ++++++--- hooli_data_eng/assets/raw_data/__init__.py | 11 +++-- 5 files changed, 62 insertions(+), 34 deletions(-) diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index 2d694646..52628d1a 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -14,8 +14,10 @@ from sklearn.linear_model import LinearRegression as Regression @asset( - compute_kind="Kubernetes", - tags={**build_kind_tag("S3")}, + tags={ + **build_kind_tag("Kubernetes"), + **build_kind_tag("S3"), + }, ) def country_stats() -> DataFrame: df = read_html("https://tinyurl.com/mry64ebh", flavor='html5lib')[0] @@ -38,8 +40,10 @@ def check_country_stats(country_stats): return AssetCheckResult(passed=True) @asset( - compute_kind="Kubernetes", - tags={**build_kind_tag("S3")}, + tags={ + **build_kind_tag("Kubernetes"), + **build_kind_tag("S3"), + }, ) def change_model(country_stats: DataFrame) -> Regression: data = country_stats.dropna(subset=["pop_change"]) @@ -47,8 +51,10 @@ def change_model(country_stats: DataFrame) -> Regression: return Regression().fit(dummies, data["pop_change"]) @asset( - compute_kind="Kubernetes", - tags={**build_kind_tag("S3")}, + tags={ + **build_kind_tag("Kubernetes"), + **build_kind_tag("S3"), + }, ) def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataFrame: result = country_stats.groupby("continent").sum() diff --git a/hooli_batch_enrichment/dagster_batch_enrichment/assets.py b/hooli_batch_enrichment/dagster_batch_enrichment/assets.py index 48c3da29..5c3f3f5a 100644 --- a/hooli_batch_enrichment/dagster_batch_enrichment/assets.py +++ b/hooli_batch_enrichment/dagster_batch_enrichment/assets.py @@ -17,8 +17,10 @@ class experimentConfig(Config): ) @asset( - compute_kind="Kubernetes", - tags={**build_kind_tag("S3")}, + tags={ + **build_kind_tag("Kubernetes"), + **build_kind_tag("S3"), + }, ) def raw_data( context: OpExecutionContext, @@ -92,8 +94,9 @@ def concat_chunk_list(chunks) -> pd.DataFrame: @graph_asset( tags={ + **build_kind_tag("Kubernetes"), **build_kind_tag("S3"), - }, + }, ) def enriched_data(raw_data) -> pd.DataFrame: """Full enrichment process""" diff --git a/hooli_data_eng/assets/forecasting/__init__.py b/hooli_data_eng/assets/forecasting/__init__.py index 5775bea6..cfe3394f 100644 --- a/hooli_data_eng/assets/forecasting/__init__.py +++ b/hooli_data_eng/assets/forecasting/__init__.py @@ -6,7 +6,6 @@ from dagster import ( asset, - AssetKey, AssetIn, MonthlyPartitionsDefinition, Output, @@ -14,7 +13,6 @@ Config, AssetExecutionContext, MaterializeResult, - SourceAsset, ) from dagster_k8s import PipesK8sClient from dagstermill import define_dagstermill_asset @@ -64,9 +62,11 @@ class modelHyperParams(Config): @asset( ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])}, - compute_kind="scikitlearn", io_manager_key="model_io_manager", - tags={**build_kind_tag("s3")}, + tags={ + **build_kind_tag("scikitlearn"), + **build_kind_tag("s3"), + }, ) def order_forecast_model( context, weekly_order_summary: pd.DataFrame, config: modelHyperParams @@ -97,12 +97,14 @@ def order_forecast_model( "weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"]), "order_forecast_model": AssetIn(), }, - compute_kind="scikitlearn", key_prefix=["forecasting"], io_manager_key="model_io_manager", partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"), - tags={"core_kpis":"", - **build_kind_tag(storage_kind)}, + tags={ + "core_kpis":"", + **build_kind_tag("scikitlearn"), + **build_kind_tag(storage_kind), + }, ) def model_stats_by_month( context, @@ -139,9 +141,11 @@ def model_stats_by_month( "weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"]), "order_forecast_model": AssetIn(), }, - compute_kind="pandas", key_prefix=["FORECASTING"], - tags={**build_kind_tag(storage_kind)}, + tags={ + **build_kind_tag("pandas"), + **build_kind_tag(storage_kind), + }, ) def predicted_orders( weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float] @@ -165,11 +169,13 @@ def predicted_orders( # for building a databricks cluster @asset( ins={"predicted_orders": AssetIn(key_prefix=["FORECASTING"])}, - compute_kind="pyspark", key_prefix=["FORECASTING"], required_resource_keys={"step_launcher", "pyspark"}, metadata={"resource_constrained_at": 50}, - tags={**build_kind_tag("databricks")}, + tags={ + **build_kind_tag("pyspark"), + **build_kind_tag("databricks"), + }, ) def big_orders(context, predicted_orders: pd.DataFrame): """Days where predicted orders surpass our current carrying capacity""" @@ -198,8 +204,10 @@ def big_orders(context, predicted_orders: pd.DataFrame): # or use that upstream Snowflake table, it is used here for illustrative purposes @asset( deps=[predicted_orders], - compute_kind="databricks", - tags={**build_kind_tag("databricks")}, + tags={ + **build_kind_tag("pyspark"), + **build_kind_tag("databricks"), + }, ) def databricks_asset( context: AssetExecutionContext, @@ -247,8 +255,10 @@ def databricks_asset( # or use that upstream Snowflake table, it is used here for illustrative purposes @asset( deps=[predicted_orders], - compute_kind="kubernetes", - tags={**build_kind_tag("S3")}, + tags={ + **build_kind_tag("kubernetes"), + **build_kind_tag("S3"), + }, ) def k8s_pod_asset( context: AssetExecutionContext, diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index 914a13ab..a935292a 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -31,10 +31,12 @@ @asset( key_prefix="MARKETING", automation_condition=AutomationCondition.on_cron('0 0 1-31/2 * *'), - compute_kind="pandas", owners=["team:programmers", "lopp@dagsterlabs.com"], ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}, - tags={**build_kind_tag(storage_kind)}, + tags={ + **build_kind_tag("pandas"), + **build_kind_tag(storage_kind), + }, ) def avg_orders( context: AssetExecutionContext, company_perf: pd.DataFrame @@ -57,10 +59,12 @@ def check_avg_orders(context, avg_orders: pd.DataFrame): @asset( key_prefix="MARKETING", - compute_kind="pandas", owners=["team:programmers"], ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}, - tags={**build_kind_tag(storage_kind)}, + tags={ + **build_kind_tag("pandas"), + **build_kind_tag(storage_kind), + }, ) def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame: """Computes min order KPI""" @@ -77,10 +81,12 @@ def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame: @asset( partitions_def=product_skus, io_manager_key="model_io_manager", - compute_kind="hex", key_prefix="MARKETING", ins={"sku_stats": AssetIn(key_prefix=["ANALYTICS"])}, - tags={**build_kind_tag("s3")}, + tags={ + **build_kind_tag("hex"), + **build_kind_tag("s3"), + }, ) def key_product_deepdive(context, sku_stats): """Creates a file for a BI tool based on the current quarters top product, represented as a dynamic partition""" diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index d8b7c76b..ce34d608 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -40,12 +40,13 @@ def _daily_partition_seq(start, end): @asset( - compute_kind="api", partitions_def=daily_partitions, metadata={"partition_expr": "created_at"}, backfill_policy=BackfillPolicy.single_run(), tags={"core_kpis":"", - **build_kind_tag(storage_kind)}, + **build_kind_tag("api"), + **build_kind_tag(storage_kind), + }, ) def users(context, api: RawDataAPI) -> pd.DataFrame: """A table containing all users data""" @@ -82,7 +83,6 @@ def check_users(context, users: pd.DataFrame): ) @asset( - compute_kind="api", partitions_def=daily_partitions, metadata={"partition_expr": "DT"}, retry_policy=RetryPolicy( @@ -92,7 +92,10 @@ def check_users(context, users: pd.DataFrame): jitter=Jitter.FULL ), backfill_policy=BackfillPolicy.single_run(), - tags={**build_kind_tag(storage_kind)}, + tags={ + **build_kind_tag("api"), + **build_kind_tag(storage_kind), + }, ) def orders(context, api: RawDataAPI) -> pd.DataFrame: """A table containing all orders that have been placed""" From ae23d3f673e4dcec8e4970beec321a6811abb641 Mon Sep 17 00:00:00 2001 From: izzy Date: Tue, 24 Sep 2024 14:13:06 -0600 Subject: [PATCH 3/4] renamed demo_assets to hoolid_data_ingest --- .github/workflows/deploy-dagster-cloud.yml | 14 +++++++------- dagster_cloud.yaml | 6 +++--- .../Dockerfile | 0 .../hooli_data_ingest}/__init__.py | 0 .../hooli_data_ingest}/assets/sling.py | 0 .../hooli_data_ingest}/definitions.py | 0 .../hooli_data_ingest}/jobs/__init__.py | 0 .../hooli_data_ingest}/resources/__init__.py | 0 .../hooli_data_ingest}/schedules/__init__.py | 0 .../locations.csv | 0 {hooli-demo-assets => hooli-data-ingest}/setup.py | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) rename {hooli-demo-assets => hooli-data-ingest}/Dockerfile (100%) rename {hooli-demo-assets/hooli_demo_assets => hooli-data-ingest/hooli_data_ingest}/__init__.py (100%) rename {hooli-demo-assets/hooli_demo_assets => hooli-data-ingest/hooli_data_ingest}/assets/sling.py (100%) rename {hooli-demo-assets/hooli_demo_assets => hooli-data-ingest/hooli_data_ingest}/definitions.py (100%) rename {hooli-demo-assets/hooli_demo_assets => hooli-data-ingest/hooli_data_ingest}/jobs/__init__.py (100%) rename {hooli-demo-assets/hooli_demo_assets => hooli-data-ingest/hooli_data_ingest}/resources/__init__.py (100%) rename {hooli-demo-assets/hooli_demo_assets => hooli-data-ingest/hooli_data_ingest}/schedules/__init__.py (100%) rename {hooli-demo-assets => hooli-data-ingest}/locations.csv (100%) rename {hooli-demo-assets => hooli-data-ingest}/setup.py (89%) diff --git a/.github/workflows/deploy-dagster-cloud.yml b/.github/workflows/deploy-dagster-cloud.yml index c1e964e0..e7756e56 100644 --- a/.github/workflows/deploy-dagster-cloud.yml +++ b/.github/workflows/deploy-dagster-cloud.yml @@ -153,23 +153,23 @@ jobs: with: command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights" - # Build 'demo_assets' code location - - name: Build and upload Docker image for demo_assets + # Build 'hooli_data_ingest' code location + - name: Build and upload Docker image for hooli_data_ingest if: steps.prerun.outputs.result != 'skip' uses: docker/build-push-action@v5 with: - context: ./hooli-demo-assets + context: ./hooli-data-ingest push: true - tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-demo-assets + tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-hooli-data-ingest cache-from: type=gha cache-to: type=gha,mode=max - - name: Update build session with image tag for demo_assets - id: ci-set-build-output-demo-assets + - name: Update build session with image tag for hooli_data_ingest + id: ci-set-build-output-hooli-data-ingest if: steps.prerun.outputs.result != 'skip' uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1.38 with: - command: "ci set-build-output --location-name=demo_assets --image-tag=$IMAGE_TAG-demo-assets" + command: "ci set-build-output --location-name=hooli_data_ingest --image-tag=$IMAGE_TAG-hooli-data-ingest" # Build pipes example container - name: Build and upload Docker image for pipes example diff --git a/dagster_cloud.yaml b/dagster_cloud.yaml index dd6dff1c..a665d46b 100644 --- a/dagster_cloud.yaml +++ b/dagster_cloud.yaml @@ -34,9 +34,9 @@ locations: directory: ./hooli_snowflake_insights registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod - - location_name: demo_assets + - location_name: hooli_data_ingest code_source: - package_name: hooli_demo_assets + package_name: hooli_data_ingest build: - directory: ./hooli-demo-assets + directory: ./hooli-data-ingest registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod diff --git a/hooli-demo-assets/Dockerfile b/hooli-data-ingest/Dockerfile similarity index 100% rename from hooli-demo-assets/Dockerfile rename to hooli-data-ingest/Dockerfile diff --git a/hooli-demo-assets/hooli_demo_assets/__init__.py b/hooli-data-ingest/hooli_data_ingest/__init__.py similarity index 100% rename from hooli-demo-assets/hooli_demo_assets/__init__.py rename to hooli-data-ingest/hooli_data_ingest/__init__.py diff --git a/hooli-demo-assets/hooli_demo_assets/assets/sling.py b/hooli-data-ingest/hooli_data_ingest/assets/sling.py similarity index 100% rename from hooli-demo-assets/hooli_demo_assets/assets/sling.py rename to hooli-data-ingest/hooli_data_ingest/assets/sling.py diff --git a/hooli-demo-assets/hooli_demo_assets/definitions.py b/hooli-data-ingest/hooli_data_ingest/definitions.py similarity index 100% rename from hooli-demo-assets/hooli_demo_assets/definitions.py rename to hooli-data-ingest/hooli_data_ingest/definitions.py diff --git a/hooli-demo-assets/hooli_demo_assets/jobs/__init__.py b/hooli-data-ingest/hooli_data_ingest/jobs/__init__.py similarity index 100% rename from hooli-demo-assets/hooli_demo_assets/jobs/__init__.py rename to hooli-data-ingest/hooli_data_ingest/jobs/__init__.py diff --git a/hooli-demo-assets/hooli_demo_assets/resources/__init__.py b/hooli-data-ingest/hooli_data_ingest/resources/__init__.py similarity index 100% rename from hooli-demo-assets/hooli_demo_assets/resources/__init__.py rename to hooli-data-ingest/hooli_data_ingest/resources/__init__.py diff --git a/hooli-demo-assets/hooli_demo_assets/schedules/__init__.py b/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py similarity index 100% rename from hooli-demo-assets/hooli_demo_assets/schedules/__init__.py rename to hooli-data-ingest/hooli_data_ingest/schedules/__init__.py diff --git a/hooli-demo-assets/locations.csv b/hooli-data-ingest/locations.csv similarity index 100% rename from hooli-demo-assets/locations.csv rename to hooli-data-ingest/locations.csv diff --git a/hooli-demo-assets/setup.py b/hooli-data-ingest/setup.py similarity index 89% rename from hooli-demo-assets/setup.py rename to hooli-data-ingest/setup.py index 12f0d56a..df318799 100644 --- a/hooli-demo-assets/setup.py +++ b/hooli-data-ingest/setup.py @@ -1,7 +1,7 @@ from setuptools import find_packages, setup setup( - name="hooli_demo_assets", + name="hooli_data_ingest", packages=find_packages(), install_requires=[ "dagster", From 9f5e64b6b392a31b13dc63f7ee00c31e4485bc4d Mon Sep 17 00:00:00 2001 From: izzy Date: Tue, 24 Sep 2024 16:07:44 -0600 Subject: [PATCH 4/4] renamed directory and corresponding imports --- hooli-data-ingest/hooli_data_ingest/__init__.py | 2 +- hooli-data-ingest/hooli_data_ingest/assets/sling.py | 2 +- hooli-data-ingest/hooli_data_ingest/definitions.py | 10 +++++----- .../hooli_data_ingest/resources/__init__.py | 6 +++--- .../hooli_data_ingest/schedules/__init__.py | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/hooli-data-ingest/hooli_data_ingest/__init__.py b/hooli-data-ingest/hooli_data_ingest/__init__.py index 59d0a1b2..4be0ed30 100644 --- a/hooli-data-ingest/hooli_data_ingest/__init__.py +++ b/hooli-data-ingest/hooli_data_ingest/__init__.py @@ -1 +1 @@ -from hooli_demo_assets.definitions import defs as defs \ No newline at end of file +from hooli_data_ingest.definitions import defs as defs \ No newline at end of file diff --git a/hooli-data-ingest/hooli_data_ingest/assets/sling.py b/hooli-data-ingest/hooli_data_ingest/assets/sling.py index acd1d00c..943a6d88 100644 --- a/hooli-data-ingest/hooli_data_ingest/assets/sling.py +++ b/hooli-data-ingest/hooli_data_ingest/assets/sling.py @@ -5,7 +5,7 @@ ) from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator -from hooli_demo_assets.resources import replication_config +from hooli_data_ingest.resources import replication_config class CustomSlingTranslator(DagsterSlingTranslator): diff --git a/hooli-data-ingest/hooli_data_ingest/definitions.py b/hooli-data-ingest/hooli_data_ingest/definitions.py index 32c2360d..def2e3e5 100644 --- a/hooli-data-ingest/hooli_data_ingest/definitions.py +++ b/hooli-data-ingest/hooli_data_ingest/definitions.py @@ -8,10 +8,10 @@ from dagster._core.definitions.metadata import with_source_code_references from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud -from hooli_demo_assets.assets.sling import my_sling_assets -from hooli_demo_assets.jobs import daily_sling_job -from hooli_demo_assets.resources import sling_resource -from hooli_demo_assets.schedules import daily_sling_assets +from hooli_data_ingest.assets.sling import my_sling_assets +from hooli_data_ingest.jobs import daily_sling_job +from hooli_data_ingest.resources import sling_resource +from hooli_data_ingest.schedules import daily_sling_assets defs = Definitions( @@ -19,7 +19,7 @@ with_source_code_references([my_sling_assets]), file_path_mapping=AnchorBasedFilePathMapping( local_file_anchor=Path(__file__), - file_anchor_path_in_repository="hooli-demo-assets/hooli_demo_assets/definitions.py", + file_anchor_path_in_repository="hooli-data-ingest/hooli_data_ingest/definitions.py", ), ), schedules=[daily_sling_assets], diff --git a/hooli-data-ingest/hooli_data_ingest/resources/__init__.py b/hooli-data-ingest/hooli_data_ingest/resources/__init__.py index 187fd4be..d8f33b84 100644 --- a/hooli-data-ingest/hooli_data_ingest/resources/__init__.py +++ b/hooli-data-ingest/hooli_data_ingest/resources/__init__.py @@ -16,10 +16,10 @@ def get_env(): # Paths for local dev current_file_path = Path(__file__) -hooli_demo_root = current_file_path.parent.parent.parent -project_root = hooli_demo_root.parent +hooli_data_ingest_root = current_file_path.parent.parent.parent +project_root = hooli_data_ingest_root.parent DUCKDB_PATH = project_root / "dbt_project" / "example.duckdb" -LOCATIONS_CSV_PATH = f"file://{hooli_demo_root}/locations.csv" +LOCATIONS_CSV_PATH = f"file://{hooli_data_ingest_root}/locations.csv" if get_env() == "LOCAL": diff --git a/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py b/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py index 9554a180..8a797469 100644 --- a/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py +++ b/hooli-data-ingest/hooli_data_ingest/schedules/__init__.py @@ -1,5 +1,5 @@ from dagster import ScheduleDefinition -from hooli_demo_assets.jobs import daily_sling_job +from hooli_data_ingest.jobs import daily_sling_job daily_sling_assets = ScheduleDefinition(