-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #53 from dagster-io/al/11-30-add_k8s_pipes_example
add k8s pipes example
- Loading branch information
Showing
9 changed files
with
151 additions
and
38 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ on: | |
pull_request: # For branch deployments | ||
types: [opened, synchronize, reopened, closed] | ||
|
||
concurrency: | ||
concurrency: | ||
# Cancel in-progress deploys to the same branch | ||
group: ${{ github.ref }} | ||
cancel-in-progress: true | ||
|
@@ -28,11 +28,11 @@ jobs: | |
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected] | ||
|
||
- name: Checkout | ||
uses: actions/checkout@v3 | ||
uses: actions/checkout@v3 | ||
if: steps.prerun.outputs.result != 'skip' | ||
with: | ||
ref: ${{ github.head_ref }} | ||
|
||
- name: Validate configuration | ||
id: ci-validate | ||
if: steps.prerun.outputs.result != 'skip' | ||
|
@@ -65,7 +65,7 @@ jobs: | |
- name: Login to ECR | ||
if: ${{ steps.prerun.outputs.result != 'skip' }} | ||
uses: aws-actions/amazon-ecr-login@v1 | ||
|
||
# Build 'data-eng-pipeline' code location | ||
- name: Build dbt manifest for data-eng-pipeline | ||
if: steps.prerun.outputs.result != 'skip' | ||
|
@@ -134,6 +134,15 @@ jobs: | |
with: | ||
command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights" | ||
|
||
# Build pipes example container | ||
- name: Build and upload Docker image for pipes example | ||
if: steps.prerun.outputs.result != 'skip' | ||
uses: docker/build-push-action@v4 | ||
with: | ||
context: ./hooli_data_eng/utils/example_container | ||
push: true | ||
tags: ${{ env.IMAGE_REGISTRY }}:latest-pipes-example | ||
|
||
# Deploy | ||
- name: Deploy to Dagster Cloud | ||
id: ci-deploy | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,14 +5,16 @@ | |
from dagster_aws.s3 import ConfigurablePickledObjectS3IOManager, S3Resource | ||
from dagster_dbt import DbtCliClientResource, DbtCliResource | ||
from dagster_duckdb_pandas import DuckDBPandasIOManager | ||
from dagster_k8s import PipesK8sClient | ||
from dagster_pyspark import pyspark_resource | ||
from dagster_snowflake_pandas import SnowflakePandasIOManager | ||
from dagstermill import ConfigurableLocalOutputNotebookIOManager | ||
|
||
from hooli_data_eng.resources.api import RawDataAPI | ||
from hooli_data_eng.resources.databricks import db_step_launcher | ||
#from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli | ||
#from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager | ||
|
||
# from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli | ||
# from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager | ||
from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem | ||
from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert | ||
|
||
|
@@ -34,13 +36,15 @@ | |
# The production deployment on Dagster Cloud uses production Snowflake | ||
# and S3 resources | ||
|
||
|
||
def get_env(): | ||
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": | ||
return "BRANCH" | ||
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": | ||
return "PROD" | ||
return "LOCAL" | ||
|
||
|
||
client = mock.MagicMock() | ||
|
||
if get_env() == "PROD": | ||
|
@@ -72,14 +76,17 @@ def get_env(): | |
"dbt": DbtCliClientResource( | ||
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL" | ||
), | ||
"dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"), | ||
"dbt2": DbtCliResource( | ||
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL" | ||
), | ||
"pyspark": pyspark_resource, | ||
"step_launcher": ResourceDefinition.none_resource(), | ||
"monitor_fs": LocalFileSystem(base_dir=file_relative_path(__file__, ".")), | ||
"email": LocalEmailAlert( | ||
smtp_email_to=["[email protected]"], smtp_email_from="[email protected]" | ||
), | ||
"pipes_client": ResourceDefinition.none_resource(), | ||
"pipes_databricks_client": ResourceDefinition.none_resource(), | ||
"pipes_k8s_client": ResourceDefinition.none_resource(), | ||
}, | ||
"BRANCH": { | ||
"io_manager": SnowflakePandasIOManager( | ||
|
@@ -98,14 +105,17 @@ def get_env(): | |
"dbt": DbtCliClientResource( | ||
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH" | ||
), | ||
"dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"), | ||
"dbt2": DbtCliResource( | ||
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH" | ||
), | ||
"pyspark": pyspark_resource, | ||
"step_launcher": db_step_launcher, | ||
"monitor_fs": s3FileSystem( | ||
region_name="us-west-2", s3_bucket="hooli-demo-branch" | ||
), | ||
"email": ResourceDefinition.none_resource(), | ||
"pipes_client": ResourceDefinition.none_resource(), | ||
"pipes_databricks_client": ResourceDefinition.none_resource(), | ||
"pipes_k8s_client": PipesK8sClient(), | ||
}, | ||
"PROD": { | ||
"io_manager": SnowflakePandasIOManager( | ||
|
@@ -124,7 +134,9 @@ def get_env(): | |
"dbt": DbtCliClientResource( | ||
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD" | ||
), | ||
"dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"), | ||
"dbt2": DbtCliResource( | ||
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD" | ||
), | ||
"pyspark": pyspark_resource, | ||
"step_launcher": db_step_launcher, | ||
"monitor_fs": s3FileSystem(region_name="us-west-2", s3_bucket="hooli-demo"), | ||
|
@@ -135,6 +147,7 @@ def get_env(): | |
smtp_username=EnvVar("SMTP_USERNAME"), | ||
smtp_password=EnvVar("SMTP_PASSWORD"), | ||
), | ||
"pipes_client": PipesDatabricksClient(client) | ||
"pipes_databricks_client": PipesDatabricksClient(client), | ||
"pipes_k8s_client": PipesK8sClient(), | ||
}, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
FROM python:3.10-slim | ||
|
||
RUN pip install dagster-pipes | ||
|
||
COPY script.py . | ||
|
||
CMD ["python", "script.py"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
#### | ||
# This represents an "external" script that is mostly independent of dagster | ||
# that dagster will orchestrate via pipes | ||
|
||
from dagster_pipes import open_dagster_pipes | ||
import random | ||
|
||
with open_dagster_pipes() as context: | ||
sample_rate = context.get_extra("sample_rate") | ||
|
||
# Stream log message back to Dagster | ||
context.log.info(f"Using sample rate: {sample_rate}") | ||
|
||
# ... your code that computes and persists the asset | ||
|
||
# Stream asset materialization metadata and data version back to Dagster. | ||
# This should be called after you've computed and stored the asset value. We | ||
# omit the asset key here because there is only one asset in scope, but for | ||
# multi-assets you can pass an `asset_key` parameter. | ||
context.report_asset_materialization( | ||
metadata={ | ||
"some_spark_metric": random.choice(["scranton", "new york", "tallahassee"]) | ||
}, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,33 @@ | ||
#### | ||
# This reprsents an "external" script that is mostly independent of dagster | ||
# This represents an "external" script that is mostly independent of dagster | ||
# that dagster will orchestrate via pipes | ||
# This script must be uploaded to Databricks manually (or via some other process) | ||
# `dagster_pipes` must be available in the databricks python environment | ||
|
||
from dagster_pipes import PipesDbfsContextLoader, PipesDbfsMessageWriter, PipesContext, open_dagster_pipes | ||
import random | ||
from dagster_pipes import ( | ||
PipesDbfsContextLoader, | ||
PipesDbfsMessageWriter, | ||
open_dagster_pipes, | ||
) | ||
import random | ||
|
||
with open_dagster_pipes(context_loader=PipesDbfsContextLoader(), message_writer=PipesDbfsMessageWriter()) as context: | ||
|
||
with open_dagster_pipes( | ||
context_loader=PipesDbfsContextLoader(), | ||
message_writer=PipesDbfsMessageWriter(), | ||
) as context: | ||
sample_rate = context.get_extra("sample_rate") | ||
|
||
# Stream log message back to Dagster | ||
context.log.info(f"Using sample rate: {sample_rate}") | ||
|
||
# ... your code that computes and persists the asset | ||
|
||
|
||
# Stream asset materialization metadata and data version back to Dagster. | ||
# This should be called after you've computed and stored the asset value. We | ||
# omit the asset key here because there is only one asset in scope, but for | ||
# multi-assets you can pass an `asset_key` parameter. | ||
context.report_asset_materialization( | ||
metadata={"some_spark_metric": random.choice(["scranton", "new york", "tallahassee"])}, | ||
) | ||
metadata={ | ||
"some_spark_metric": random.choice(["scranton", "new york", "tallahassee"]) | ||
}, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters