Skip to content

Commit

Permalink
Merge pull request #52 from dagster-io/izzy/add-embedded-elt
Browse files Browse the repository at this point in the history
Izzy/add embedded elt
  • Loading branch information
izzye84 authored Dec 12, 2023
2 parents 43674aa + c0eba31 commit 2a85d8d
Show file tree
Hide file tree
Showing 15 changed files with 187 additions and 5 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,22 @@ jobs:
with:
command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights"

# Build 'demo_assets' code location
- name: Build and upload Docker image for demo_assets
if: steps.prerun.outputs.result != 'skip'
uses: docker/build-push-action@v4
with:
context: ./hooli-demo-assets
push: true
tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-demo-assets

- name: Update build session with image tag for demo_assets
id: ci-set-build-output-demo-assets
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected]
with:
command: "ci set-build-output --location-name=demo_assets --image-tag=$IMAGE_TAG-demo-assets"

# Build pipes example container
- name: Build and upload Docker image for pipes example
if: steps.prerun.outputs.result != 'skip'
Expand Down
6 changes: 6 additions & 0 deletions dagster_cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ locations:
directory: ./hooli_snowflake_insights
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod

- location_name: demo_assets
code_source:
package_name: hooli_demo_assets
build:
directory: ./hooli-demo-assets
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod
15 changes: 11 additions & 4 deletions dbt_project/models/ANALYTICS/orders_augmented.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
select
o.*,
u.company
from
{{ ref("orders_cleaned") }} o left join
{{ ref("users_cleaned") }} u on (o.user_id = u.user_id)
u.company,
l.state,
l_zip_code
from {{ ref("orders_cleaned") }} o

left join {{ ref("users_cleaned") }} u
on o.user_id = u.user_id

left join {{ ref("locations_cleaned") }} l
on o.user_id = l.user_id

{% if is_incremental() %}
WHERE o.order_date >= '{{ var('min_date') }}' AND o.order_date <= '{{ var('max_date') }}'
{% endif %}
21 changes: 21 additions & 0 deletions dbt_project/models/CLEANED/locations_cleaned.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
with source as (

select *
from {{ source("RAW_DATA", "location") }}

),

source_renamed as (

select l_user_id as user_id,
l_street_address as streed_address,
l_state as state,
l_country as country,
l_zip_code as zip_code,
_sling_loaded_at
from source

)

select *
from source_renamed
4 changes: 4 additions & 0 deletions dbt_project/models/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ sources:
tables:
- name: "orders"
- name: "users"
- name: "location"
meta:
dagster:
asset_key: ['location']
- name: "FORECASTING"
tables:
- name: "predicted_orders"
7 changes: 7 additions & 0 deletions hooli-demo-assets/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM python:3.10-slim

WORKDIR /opt/dagster/app

ADD . .

RUN pip install -e .
1 change: 1 addition & 0 deletions hooli-demo-assets/hooli_demo_assets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from hooli_demo_assets.definitions import defs as defs
12 changes: 12 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/assets/sling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dagster import AssetSpec
from dagster_embedded_elt.sling import build_sling_asset, SlingMode


raw_location = build_sling_asset(
asset_spec=AssetSpec(key=["location"]),
source_stream="s3://hooli-demo/embedded-elt/",
target_object= "RAW_DATA.LOCATION",
mode=SlingMode.FULL_REFRESH,
source_options={"format": "csv"},
sling_resource_key="s3_to_snowflake_resource"
)
20 changes: 20 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dagster import (
Definitions,
load_assets_from_modules,

)

from hooli_demo_assets.assets import sling
from hooli_demo_assets.jobs import daily_sling_job
from hooli_demo_assets.resources import get_env, resource_def
from hooli_demo_assets.schedules import daily_sling_assets


sling_assets = load_assets_from_modules([sling], key_prefix="RAW_DATA", group_name="RAW_DATA")

defs = Definitions(
assets=[*sling_assets],
schedules=[daily_sling_assets],
jobs=[daily_sling_job],
resources=resource_def[get_env()],
)
8 changes: 8 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dagster import AssetSelection, define_asset_job

raw_location_by_day = AssetSelection.keys(["RAW_DATA", "location"])

daily_sling_job = define_asset_job(
name="daily_sling_job",
selection=raw_location_by_day,
)
57 changes: 57 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os

from dagster import EnvVar, ResourceDefinition
from dagster_embedded_elt.sling import (
SlingResource,
SlingSourceConnection,
SlingTargetConnection,
)

def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"

# embedded elt source and target

source = SlingSourceConnection(
type="s3",
bucket=EnvVar("AWS_S3_BUCKET"),
region=EnvVar("AWS_REGION"),
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)
branch_target = SlingTargetConnection(
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
database="DEMO_DB2_BRANCH",
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
)
prod_target = SlingTargetConnection(
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
database="DEMO_DB2",
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
)

resource_def = {
"LOCAL": {
"s3_to_snowflake_resource": ResourceDefinition.none_resource(),
},
"BRANCH": {
"s3_to_snowflake_resource": SlingResource(
source_connection=source, target_connection=branch_target
),
},
"PROD": {
"s3_to_snowflake_resource": SlingResource(
source_connection=source, target_connection=prod_target
),
},
}
7 changes: 7 additions & 0 deletions hooli-demo-assets/hooli_demo_assets/schedules/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dagster import ScheduleDefinition
from hooli_demo_assets.jobs import daily_sling_job

daily_sling_assets = ScheduleDefinition(
job=daily_sling_job,
cron_schedule="0 0 * * *", # every day at midnight
)
12 changes: 12 additions & 0 deletions hooli-demo-assets/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from setuptools import find_packages, setup

setup(
name="hooli_demo_assets",
packages=find_packages(),
install_requires=[
"dagster",
"dagster-cloud",
"dagster-embedded-elt",
],
extras_require={"dev": ["dagit", "pytest"]},
)
2 changes: 1 addition & 1 deletion hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,6 @@ def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
dbt_views = load_assets_from_dbt_project(
DBT_PROJECT_DIR,
DBT_PROFILES_DIR,
select="company_perf sku_stats company_stats",
select="company_perf sku_stats company_stats locations_cleaned",
dagster_dbt_translator=CustomDagsterDbtTranslatorForViews()
)
4 changes: 4 additions & 0 deletions workspace.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
load_from:
- python_module:
module_name: hooli_data_eng
# Example of deploying multiple code locations locally
# - python_module:
# module_name: hooli_demo_assets
# working_directory: hooli-demo-assets/

0 comments on commit 2a85d8d

Please sign in to comment.