Skip to content

Commit

Permalink
add k8s pipes example
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Nov 30, 2023
1 parent 3b43ceb commit 614ae00
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 36 deletions.
17 changes: 13 additions & 4 deletions .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
pull_request: # For branch deployments
types: [opened, synchronize, reopened, closed]

concurrency:
concurrency:
# Cancel in-progress deploys to the same branch
group: ${{ github.ref }}
cancel-in-progress: true
Expand All @@ -28,11 +28,11 @@ jobs:
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected]

- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v3
if: steps.prerun.outputs.result != 'skip'
with:
ref: ${{ github.head_ref }}

- name: Validate configuration
id: ci-validate
if: steps.prerun.outputs.result != 'skip'
Expand Down Expand Up @@ -65,7 +65,7 @@ jobs:
- name: Login to ECR
if: ${{ steps.prerun.outputs.result != 'skip' }}
uses: aws-actions/amazon-ecr-login@v1

# Build 'data-eng-pipeline' code location
- name: Build dbt manifest for data-eng-pipeline
if: steps.prerun.outputs.result != 'skip'
Expand Down Expand Up @@ -134,6 +134,15 @@ jobs:
with:
command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights"

# Build pipes example container
- name: Build and upload Docker image for pipes example
if: steps.prerun.outputs.result != 'skip'
uses: docker/build-push-action@v4
with:
context: ./hooli_data_eng/utils/example_container
push: true
tags: ${{ env.IMAGE_REGISTRY }}:latest-pipes-example

# Deploy
- name: Deploy to Dagster Cloud
id: ci-deploy
Expand Down
45 changes: 37 additions & 8 deletions hooli_data_eng/assets/forecasting/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Tuple
from dagster_k8s import PipesK8sClient

import numpy as np
import pandas as pd
Expand All @@ -10,10 +11,9 @@
MonthlyPartitionsDefinition,
Output,
Field,
Int,
Config,
AssetExecutionContext,
MaterializeResult
MaterializeResult,
)
from dagstermill import define_dagstermill_asset
from dagster._utils import file_relative_path
Expand All @@ -26,6 +26,10 @@ def model_func(x, a, b):
return a * np.exp(b * (x / 10**18 - 1.6095))


CONTAINER_REGISTRY = (
"764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod"
)

# ----- Forecasting Assets -----
# These assets live downstream of tables created by dbt
# which are referenced by the key_prefix 'analytics',
Expand Down Expand Up @@ -173,21 +177,23 @@ def big_orders(context, predicted_orders: pd.DataFrame):
required_resource_keys={"io_manager"},
)


# This databricks pipes asset only runs in prod, see utils/external_databricks_script.py
# The dependency on predicted_orders is not a real dependency since the script does not rely
# or use that upstream Snowflake table, it is used here for illustrative purposes
@asset(
deps=[predicted_orders],
compute_kind="databricks",
deps=[predicted_orders],
compute_kind="databricks",
)
def databricks_asset(
context: AssetExecutionContext, pipes_client: PipesDatabricksClient
context: AssetExecutionContext,
pipes_databricks_client: PipesDatabricksClient,
) -> MaterializeResult:
# cluster config
cluster_config = {
"num_workers": 1,
"spark_version": "11.2.x-scala2.12",
"node_type_id": "i3.xlarge"
"node_type_id": "i3.xlarge",
}

# task specification will be passed to Databricks as-is, except for the
Expand All @@ -213,8 +219,31 @@ def databricks_asset(
extras = {"sample_rate": 1.0}

# synchronously execute the databricks job
return pipes_client.run(
return pipes_databricks_client.run(
task=task,
context=context,
extras=extras,
).get_materialize_result()
).get_materialize_result()


# This k8s pipes asset only runs in prod, see utils/example_container
# The dependency on predicted_orders is not a real dependency since the script does not rely
# or use that upstream Snowflake table, it is used here for illustrative purposes
@asset(
deps=[predicted_orders],
compute_kind="kubernetes",
)
def k8s_pod_asset(
context: AssetExecutionContext,
pipes_k8s_client: PipesK8sClient,
) -> MaterializeResult:
# Arbitrary json-serializable data you want access to from the `PipesSession`
# in the k8s pod container. Assume `sample_rate` is a parameter used by
# the target job's business logic.
extras = {"sample_rate": 1.0}

return pipes_k8s_client.run(
context=context,
image=f"{CONTAINER_REGISTRY}:latest-pipes-example",
extras=extras,
).get_materialize_result()
28 changes: 20 additions & 8 deletions hooli_data_eng/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
from dagster_aws.s3 import ConfigurablePickledObjectS3IOManager, S3Resource
from dagster_dbt import DbtCliClientResource, DbtCliResource
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_k8s import PipesK8sClient
from dagster_pyspark import pyspark_resource
from dagster_snowflake_pandas import SnowflakePandasIOManager
from dagstermill import ConfigurableLocalOutputNotebookIOManager

from hooli_data_eng.resources.api import RawDataAPI
from hooli_data_eng.resources.databricks import db_step_launcher
#from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli
#from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager

# from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli
# from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager
from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem
from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert

Expand All @@ -34,13 +36,15 @@
# The production deployment on Dagster Cloud uses production Snowflake
# and S3 resources


def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"


client = mock.MagicMock()

if get_env() == "PROD":
Expand Down Expand Up @@ -72,7 +76,9 @@ def get_env():
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"
),
"dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"),
"dbt2": DbtCliResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"
),
"pyspark": pyspark_resource,
"step_launcher": ResourceDefinition.none_resource(),
"monitor_fs": LocalFileSystem(base_dir=file_relative_path(__file__, ".")),
Expand All @@ -98,14 +104,17 @@ def get_env():
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"
),
"dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"),
"dbt2": DbtCliResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"
),
"pyspark": pyspark_resource,
"step_launcher": db_step_launcher,
"monitor_fs": s3FileSystem(
region_name="us-west-2", s3_bucket="hooli-demo-branch"
),
"email": ResourceDefinition.none_resource(),
"pipes_client": ResourceDefinition.none_resource(),
"pipes_databricks_client": ResourceDefinition.none_resource(),
"pipes_k8s_client": ResourceDefinition.none_resource(),
},
"PROD": {
"io_manager": SnowflakePandasIOManager(
Expand All @@ -124,7 +133,9 @@ def get_env():
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"
),
"dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"),
"dbt2": DbtCliResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"
),
"pyspark": pyspark_resource,
"step_launcher": db_step_launcher,
"monitor_fs": s3FileSystem(region_name="us-west-2", s3_bucket="hooli-demo"),
Expand All @@ -135,6 +146,7 @@ def get_env():
smtp_username=EnvVar("SMTP_USERNAME"),
smtp_password=EnvVar("SMTP_PASSWORD"),
),
"pipes_client": PipesDatabricksClient(client)
"pipes_databricks_client": PipesDatabricksClient(client),
"pipes_k8s_client": PipesK8sClient(),
},
}
}
7 changes: 7 additions & 0 deletions hooli_data_eng/utils/example_container/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM python:3.10-slim

RUN pip install dagster-pipes

COPY script.py .

CMD ["python", "script.py"]
24 changes: 24 additions & 0 deletions hooli_data_eng/utils/example_container/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
####
# This represents an "external" script that is mostly independent of dagster
# that dagster will orchestrate via pipes

from dagster_pipes import open_dagster_pipes
import random

with open_dagster_pipes() as context:
sample_rate = context.get_extra("sample_rate")

# Stream log message back to Dagster
context.log.info(f"Using sample rate: {sample_rate}")

# ... your code that computes and persists the asset

# Stream asset materialization metadata and data version back to Dagster.
# This should be called after you've computed and stored the asset value. We
# omit the asset key here because there is only one asset in scope, but for
# multi-assets you can pass an `asset_key` parameter.
context.report_asset_materialization(
metadata={
"some_spark_metric": random.choice(["scranton", "new york", "tallahassee"])
},
)
23 changes: 15 additions & 8 deletions hooli_data_eng/utils/external_databricks_script.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
####
# This reprsents an "external" script that is mostly independent of dagster
# This represents an "external" script that is mostly independent of dagster
# that dagster will orchestrate via pipes
# This script must be uploaded to Databricks manually (or via some other process)
# `dagster_pipes` must be available in the databricks python environment

from dagster_pipes import PipesDbfsContextLoader, PipesDbfsMessageWriter, PipesContext, open_dagster_pipes
import random
from dagster_pipes import (
PipesDbfsContextLoader,
PipesDbfsMessageWriter,
open_dagster_pipes,
)
import random

with open_dagster_pipes(context_loader=PipesDbfsContextLoader(), message_writer=PipesDbfsMessageWriter()) as context:

with open_dagster_pipes(
context_loader=PipesDbfsContextLoader(),
message_writer=PipesDbfsMessageWriter(),
) as context:
sample_rate = context.get_extra("sample_rate")

# Stream log message back to Dagster
context.log.info(f"Using sample rate: {sample_rate}")

# ... your code that computes and persists the asset


# Stream asset materialization metadata and data version back to Dagster.
# This should be called after you've computed and stored the asset value. We
# omit the asset key here because there is only one asset in scope, but for
# multi-assets you can pass an `asset_key` parameter.
context.report_asset_materialization(
metadata={"some_spark_metric": random.choice(["scranton", "new york", "tallahassee"])},
)
metadata={
"some_spark_metric": random.choice(["scranton", "new york", "tallahassee"])
},
)
6 changes: 3 additions & 3 deletions hooli_data_eng_tests/test_databricks_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
from dagster_databricks import PipesDatabricksClient

# to meaningfully run this test you must set
# DAGSTER_CLOUD_DEPLOYMENT_NAME="data-eng-prod"
# DAGSTER_CLOUD_DEPLOYMENT_NAME="data-eng-prod"
# and also set
# DATABRICKS_HOST="your host"
# DATABRICKS_TOKEN="your token"

result = materialize(
[databricks_asset],
resources={
"pipes_client": PipesDatabricksClient(
"pipes_databricks_client": PipesDatabricksClient(
client,
)
},
raise_on_error=False,
)
)
11 changes: 6 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"scipy",
"dbt-core",
"dbt-duckdb",
"dbt-snowflake",
"dbt-snowflake",
"dagster-duckdb",
"dagster-aws",
"dagster-duckdb-pandas",
Expand All @@ -22,14 +22,15 @@
"dagster-cloud",
"dagster-pyspark",
"dagster-databricks",
"dagster-k8s",
"dagstermill",
"gql",
"plotnine",
"responses",
"responses",
"requests",
"requests_toolbelt",
"html5lib",
"scikit-learn"
"html5lib",
"scikit-learn",
],
extras_require={"dev": ["dagit", "pytest"]},
)
)

0 comments on commit 614ae00

Please sign in to comment.