Skip to content

Commit

Permalink
added embedded elt example
Browse files Browse the repository at this point in the history
  • Loading branch information
izzye84 committed Nov 30, 2023
1 parent 456c72b commit 6249b68
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 4 deletions.
15 changes: 11 additions & 4 deletions dbt_project/models/ANALYTICS/orders_augmented.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
select
o.*,
u.company
from
{{ ref("orders_cleaned") }} o left join
{{ ref("users_cleaned") }} u on (o.user_id = u.user_id)
u.company,
l.state,
l_zip_code
from {{ ref("orders_cleaned") }} o

left join {{ ref("users_cleaned") }} u
on o.user_id = u.user_id

left join {{ ref("location_cleaned") }} l
on o.user_id = l.user_id

{% if is_incremental() %}
WHERE o.order_date >= '{{ var('min_date') }}' AND o.order_date <= '{{ var('max_date') }}'
{% endif %}
21 changes: 21 additions & 0 deletions dbt_project/models/CLEANED/location_cleaned.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
with source as (

select *
from {{ source("RAW_DATA", "raw_location") }}

),

source_renamed as (

select l_user_id as user_id,
l_street_address as streed_address,
l_state as state,
l_country as country,
l_zip_code as zip_code,
l_sling_loaded_at
from source

)

select *
from source_renamed
1 change: 1 addition & 0 deletions dbt_project/models/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ sources:
tables:
- name: "orders"
- name: "users"
- name: "raw_location"
- name: "FORECASTING"
tables:
- name: "predicted_orders"
1 change: 1 addition & 0 deletions hooli-demo-assets/hooli_demo_assets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from hooli_demo_assets.definitions import defs as defs
12 changes: 12 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/assets/sling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dagster import AssetSpec
from dagster_embedded_elt.sling import build_sling_asset, SlingMode


raw_location = build_sling_asset(
asset_spec=AssetSpec("raw_location"),
source_stream="s3://hooli-demo/embedded-elt/",
target_object= "RAW_DATA.LOCATION",
mode=SlingMode.FULL_REFRESH,
source_options={"format": "csv"},
sling_resource_key="s3_to_snowflake_resource"
)
20 changes: 20 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dagster import (
Definitions,
load_assets_from_modules,

)

from hooli_demo_assets.assets import sling
from hooli_demo_assets.jobs import daily_sling_job
from hooli_demo_assets.resources import get_env, resource_def
from hooli_demo_assets.schedules import daily_sling_assets


sling_assets = load_assets_from_modules([sling], group_name="RAW_DATA")

defs = Definitions(
assets=[*sling_assets],
schedules=[daily_sling_assets],
jobs=[daily_sling_job],
resources=resource_def[get_env()],
)
8 changes: 8 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dagster import AssetSelection, define_asset_job

raw_location_by_day = AssetSelection.keys("raw_location")

daily_sling_job = define_asset_job(
name="daily_sling_job",
selection=raw_location_by_day,
)
57 changes: 57 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os

from dagster import EnvVar, ResourceDefinition
from dagster_embedded_elt.sling import (
SlingResource,
SlingSourceConnection,
SlingTargetConnection,
)

def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"

# embedded elt source and target

source = SlingSourceConnection(
type="s3",
bucket=EnvVar("AWS_S3_BUCKET"),
region=EnvVar("AWS_REGION"),
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)
branch_target = SlingTargetConnection(
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
database="DEMO_DB2_BRANCH",
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
)
prod_target = SlingTargetConnection(
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
database="DEMO_DB2",
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
)

resource_def = {
"LOCAL": {
"s3_to_snowflake_resource": ResourceDefinition.none_resource(),
},
"BRANCH": {
"s3_to_snowflake_resource": SlingResource(
source_connection=source, target_connection=branch_target
),
},
"PROD": {
"s3_to_snowflake_resource": SlingResource(
source_connection=source, target_connection=prod_target
),
},
}
7 changes: 7 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/schedules/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dagster import ScheduleDefinition
from hooli_demo_assets.jobs import daily_sling_job

daily_sling_assets = ScheduleDefinition(
job=daily_sling_job,
cron_schedule="0 0 * * *", # every day at midnight
)

0 comments on commit 6249b68

Please sign in to comment.