From f550709ac26cd7ef0a687181d4332b036d33bdab Mon Sep 17 00:00:00 2001 From: izzy Date: Thu, 16 May 2024 18:15:24 -0600 Subject: [PATCH 1/5] updated sling assets to use the sling_assets decorator --- .../hooli_demo_assets/assets/sling.py | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/hooli-demo-assets/hooli_demo_assets/assets/sling.py b/hooli-demo-assets/hooli_demo_assets/assets/sling.py index ca7ebbff..faf36abc 100644 --- a/hooli-demo-assets/hooli_demo_assets/assets/sling.py +++ b/hooli-demo-assets/hooli_demo_assets/assets/sling.py @@ -1,12 +1,25 @@ -from dagster import AssetSpec -from dagster_embedded_elt.sling import build_sling_asset, SlingMode - - -raw_location = build_sling_asset( - asset_spec=AssetSpec(key=["locations"]), - source_stream="s3://hooli-demo/embedded-elt/", - target_object= "RAW_DATA.LOCATIONS", - mode=SlingMode.FULL_REFRESH, - source_options={"format": "csv"}, - sling_resource_key="s3_to_snowflake_resource" -) \ No newline at end of file +from dagster_embedded_elt.sling import ( + sling_assets, + SlingResource, +) +from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator + +from hooli_demo_assets.resources import replication_config + + +class CustomSlingTranslator(DagsterSlingTranslator): + def __init__(self, target_prefix="RAW_DATA"): + super().__init__(target_prefix=target_prefix) + + def get_group_name(self, stream_definition): + return "RAW_DATA" + + +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=CustomSlingTranslator(), +) +def my_sling_assets(context, sling: SlingResource): + yield from sling.replicate(context=context) + for row in sling.stream_raw_logs(): + context.log.info(row) From fd6c25f1eea5dfcb8ef2e1a58b5dabb7a964cc40 Mon Sep 17 00:00:00 2001 From: izzy Date: Thu, 16 May 2024 18:23:19 -0600 Subject: [PATCH 2/5] updated the definitions to use my_sling_assets --- .../hooli_demo_assets/definitions.py | 24 +++++++++---------- .../hooli_demo_assets/jobs/__init__.py | 8 ++++--- .../hooli_demo_assets/schedules/__init__.py | 7 +++--- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/hooli-demo-assets/hooli_demo_assets/definitions.py b/hooli-demo-assets/hooli_demo_assets/definitions.py index 15d61c48..3c6aa84c 100644 --- a/hooli-demo-assets/hooli_demo_assets/definitions.py +++ b/hooli-demo-assets/hooli_demo_assets/definitions.py @@ -1,20 +1,18 @@ from dagster import ( - Definitions, - load_assets_from_modules, - + Definitions, ) -from hooli_demo_assets.assets import sling +from hooli_demo_assets.assets.sling import my_sling_assets from hooli_demo_assets.jobs import daily_sling_job -from hooli_demo_assets.resources import get_env, resource_def -from hooli_demo_assets.schedules import daily_sling_assets +from hooli_demo_assets.resources import sling_resource +from hooli_demo_assets.schedules import daily_sling_assets -sling_assets = load_assets_from_modules([sling], key_prefix="RAW_DATA", group_name="RAW_DATA") - defs = Definitions( - assets=[*sling_assets], - schedules=[daily_sling_assets], - jobs=[daily_sling_job], - resources=resource_def[get_env()], -) \ No newline at end of file + assets=[my_sling_assets], + schedules=[daily_sling_assets], + jobs=[daily_sling_job], + resources={ + "sling": sling_resource + }, +) diff --git a/hooli-demo-assets/hooli_demo_assets/jobs/__init__.py b/hooli-demo-assets/hooli_demo_assets/jobs/__init__.py index 127905b0..c7f8e793 100644 --- a/hooli-demo-assets/hooli_demo_assets/jobs/__init__.py +++ b/hooli-demo-assets/hooli_demo_assets/jobs/__init__.py @@ -1,8 +1,10 @@ from dagster import AssetSelection, define_asset_job + raw_location_by_day = AssetSelection.keys(["RAW_DATA", "locations"]) + daily_sling_job = define_asset_job( - name="daily_sling_job", - selection=raw_location_by_day, -) \ No newline at end of file + name="daily_sling_job", + selection=raw_location_by_day, +) diff --git a/hooli-demo-assets/hooli_demo_assets/schedules/__init__.py b/hooli-demo-assets/hooli_demo_assets/schedules/__init__.py index ed84147c..9554a180 100644 --- a/hooli-demo-assets/hooli_demo_assets/schedules/__init__.py +++ b/hooli-demo-assets/hooli_demo_assets/schedules/__init__.py @@ -1,7 +1,8 @@ from dagster import ScheduleDefinition from hooli_demo_assets.jobs import daily_sling_job + daily_sling_assets = ScheduleDefinition( - job=daily_sling_job, - cron_schedule="0 0 * * *", # every day at midnight -) \ No newline at end of file + job=daily_sling_job, + cron_schedule="0 0 * * *", # every day at midnight +) From 9905e286c9757706cb277d6f617043b52c485a5d Mon Sep 17 00:00:00 2001 From: izzy Date: Thu, 16 May 2024 18:21:44 -0600 Subject: [PATCH 3/5] modified the sling resource to use the updated APIs. --- .../hooli_demo_assets/resources/__init__.py | 139 +++++++++++------- 1 file changed, 86 insertions(+), 53 deletions(-) diff --git a/hooli-demo-assets/hooli_demo_assets/resources/__init__.py b/hooli-demo-assets/hooli_demo_assets/resources/__init__.py index 30cd1b0a..42f3bfed 100644 --- a/hooli-demo-assets/hooli_demo_assets/resources/__init__.py +++ b/hooli-demo-assets/hooli_demo_assets/resources/__init__.py @@ -1,66 +1,99 @@ import os +from pathlib import Path from dagster import EnvVar -from pathlib import Path from dagster_embedded_elt.sling import ( - SlingResource, - SlingSourceConnection, - SlingTargetConnection, + SlingResource, + SlingConnectionResource, ) + def get_env(): - if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": - return "BRANCH" - if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": - return "PROD" - return "LOCAL" + if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": + return "BRANCH" + if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": + return "PROD" + return "LOCAL" + -# embedded elt source and target +# Path for duckdb connection - needed for local dev current_file_path = Path(__file__) parent_dir_path = current_file_path.parent.parent.parent.parent DUCKDB_PATH = parent_dir_path / "dbt_project" / "example.duckdb" -source = SlingSourceConnection( - type="s3", - bucket=EnvVar("AWS_S3_BUCKET"), - region=EnvVar("AWS_REGION"), - access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), - secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), -) -local_target = SlingTargetConnection( - type="duckdb", - instance=f"{DUCKDB_PATH}", -) -branch_target = SlingTargetConnection( - type="snowflake", - host=EnvVar("SNOWFLAKE_HOST"), - database="DEMO_DB2_BRANCH", - user=EnvVar("SNOWFLAKE_USER"), - password=EnvVar("SNOWFLAKE_PASSWORD"), - role=EnvVar("SNOWFLAKE_ROLE"), -) -prod_target = SlingTargetConnection( - type="snowflake", - host=EnvVar("SNOWFLAKE_HOST"), - database="DEMO_DB2", - user=EnvVar("SNOWFLAKE_USER"), - password=EnvVar("SNOWFLAKE_PASSWORD"), - role=EnvVar("SNOWFLAKE_ROLE"), -) -resource_def = { - "LOCAL": { - "s3_to_snowflake_resource": - SlingResource(source_connection=source, target_connection=local_target), - }, - "BRANCH": { - "s3_to_snowflake_resource": SlingResource( - source_connection=source, target_connection=branch_target - ), - }, - "PROD": { - "s3_to_snowflake_resource": SlingResource( - source_connection=source, target_connection=prod_target - ), - }, -} \ No newline at end of file +# Alternatively a replication.yaml file can be used +def create_replication_config(env: str): + # Determine the target database dynamically based on the environment + target_database = { + 'LOCAL': 'DUCKDB', + 'BRANCH': 'SNOWFLAKE_BRANCH', + 'PROD': 'SNOWFLAKE_PROD' + }.get(env, 'DUCKDB') # Default to DUCKDB if no environment matches + + replication_config = { + "source": "S3", + "target": target_database, + "defaults": { + "mode": "full-refresh", + "object": "{stream_file_folder}.{stream_file_name}", + "source_options": { + "format": "csv" + } + }, + "streams": { + "s3://hooli-demo/embedded-elt/locations.csv": { + "object": "locations" + } + } + } + return replication_config + + +def create_sling_resource(env: str): + # Dynamically generate connection based on enviornment + connections = [ + SlingConnectionResource( + name="S3", + type="s3", + bucket=EnvVar("AWS_S3_BUCKET"), + region=EnvVar("AWS_REGION"), + access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), + secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), + ) + ] + if env == 'LOCAL': + connections.append(SlingConnectionResource( + name="DUCKDB", + type="duckdb", + instance=f"{DUCKDB_PATH}", + schema="RAW_DATA", + )) + elif env == 'BRANCH': + connections.append(SlingConnectionResource( + name="SNOWFLAKE_BRANCH", + type="snowflake", + host=EnvVar("SNOWFLAKE_HOST"), + user=EnvVar("SNOWFLAKE_USER"), + password=EnvVar("SNOWFLAKE_PASSWORD"), + role=EnvVar("SNOWFLAKE_ROLE"), + database="DEMO_DB2_BRANCH", + schema="RAW_DATA", + )) + elif env == 'PROD': + connections.append(SlingConnectionResource( + name="SNOWFLAKE_PROD", + type="snowflake", + host=EnvVar("SNOWFLAKE_HOST"), + user=EnvVar("SNOWFLAKE_USER"), + password=EnvVar("SNOWFLAKE_PASSWORD"), + role=EnvVar("SNOWFLAKE_ROLE"), + database="DEMO_DB2", + schema="RAW_DATA", + )) + return SlingResource(connections=connections) + + +env = get_env() +replication_config = create_replication_config(env) +sling_resource = create_sling_resource(env) From 50569fd991e0e8433046e872d2b685063ad1e676 Mon Sep 17 00:00:00 2001 From: izzy Date: Thu, 16 May 2024 21:27:12 -0600 Subject: [PATCH 4/5] ignore duckdb files --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index db50d931..86c36ee1 100644 --- a/.gitignore +++ b/.gitignore @@ -172,5 +172,5 @@ cython_debug/ tmp*/ # dbt -dbt_project/example.duckdb +dbt_project/example.duckdb* dbt_project/logs From 3024396e28bc0bde640771e4110d36bf5562c43c Mon Sep 17 00:00:00 2001 From: izzy <60406698+izzye84@users.noreply.github.com> Date: Fri, 24 May 2024 09:11:48 -0600 Subject: [PATCH 5/5] Update hooli-demo-assets/hooli_demo_assets/resources/__init__.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adding database param to be consistent with other environments—bug still exists though. Co-authored-by: Christian Minich --- hooli-demo-assets/hooli_demo_assets/resources/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hooli-demo-assets/hooli_demo_assets/resources/__init__.py b/hooli-demo-assets/hooli_demo_assets/resources/__init__.py index 42f3bfed..bd15a2e5 100644 --- a/hooli-demo-assets/hooli_demo_assets/resources/__init__.py +++ b/hooli-demo-assets/hooli_demo_assets/resources/__init__.py @@ -67,6 +67,7 @@ def create_sling_resource(env: str): name="DUCKDB", type="duckdb", instance=f"{DUCKDB_PATH}", + database="example", schema="RAW_DATA", )) elif env == 'BRANCH':