From 614ae002e00cb02d4435406291280ea12af5caec Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Thu, 30 Nov 2023 15:55:14 -0600 Subject: [PATCH] add k8s pipes example --- .github/workflows/deploy-dagster-cloud.yml | 17 +++++-- hooli_data_eng/assets/forecasting/__init__.py | 45 +++++++++++++++---- hooli_data_eng/resources/__init__.py | 28 ++++++++---- .../utils/example_container/Dockerfile | 7 +++ .../utils/example_container/script.py | 24 ++++++++++ .../utils/external_databricks_script.py | 23 ++++++---- hooli_data_eng_tests/test_databricks_pipes.py | 6 +-- setup.py | 11 ++--- 8 files changed, 125 insertions(+), 36 deletions(-) create mode 100644 hooli_data_eng/utils/example_container/Dockerfile create mode 100644 hooli_data_eng/utils/example_container/script.py diff --git a/.github/workflows/deploy-dagster-cloud.yml b/.github/workflows/deploy-dagster-cloud.yml index 5ad06ab3..c8375187 100644 --- a/.github/workflows/deploy-dagster-cloud.yml +++ b/.github/workflows/deploy-dagster-cloud.yml @@ -7,7 +7,7 @@ on: pull_request: # For branch deployments types: [opened, synchronize, reopened, closed] -concurrency: +concurrency: # Cancel in-progress deploys to the same branch group: ${{ github.ref }} cancel-in-progress: true @@ -28,11 +28,11 @@ jobs: uses: dagster-io/dagster-cloud-action/actions/utils/prerun@v0.1.27 - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v3 if: steps.prerun.outputs.result != 'skip' with: ref: ${{ github.head_ref }} - + - name: Validate configuration id: ci-validate if: steps.prerun.outputs.result != 'skip' @@ -65,7 +65,7 @@ jobs: - name: Login to ECR if: ${{ steps.prerun.outputs.result != 'skip' }} uses: aws-actions/amazon-ecr-login@v1 - + # Build 'data-eng-pipeline' code location - name: Build dbt manifest for data-eng-pipeline if: steps.prerun.outputs.result != 'skip' @@ -134,6 +134,15 @@ jobs: with: command: "ci set-build-output --location-name=snowflake_insights --image-tag=$IMAGE_TAG-snowflake-insights" + # Build pipes example container + - name: Build and upload Docker image for pipes example + if: steps.prerun.outputs.result != 'skip' + uses: docker/build-push-action@v4 + with: + context: ./hooli_data_eng/utils/example_container + push: true + tags: ${{ env.IMAGE_REGISTRY }}:latest-pipes-example + # Deploy - name: Deploy to Dagster Cloud id: ci-deploy diff --git a/hooli_data_eng/assets/forecasting/__init__.py b/hooli_data_eng/assets/forecasting/__init__.py index 7df195d2..6bdacbd6 100644 --- a/hooli_data_eng/assets/forecasting/__init__.py +++ b/hooli_data_eng/assets/forecasting/__init__.py @@ -1,4 +1,5 @@ from typing import Any, Tuple +from dagster_k8s import PipesK8sClient import numpy as np import pandas as pd @@ -10,10 +11,9 @@ MonthlyPartitionsDefinition, Output, Field, - Int, Config, AssetExecutionContext, - MaterializeResult + MaterializeResult, ) from dagstermill import define_dagstermill_asset from dagster._utils import file_relative_path @@ -26,6 +26,10 @@ def model_func(x, a, b): return a * np.exp(b * (x / 10**18 - 1.6095)) +CONTAINER_REGISTRY = ( + "764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod" +) + # ----- Forecasting Assets ----- # These assets live downstream of tables created by dbt # which are referenced by the key_prefix 'analytics', @@ -173,21 +177,23 @@ def big_orders(context, predicted_orders: pd.DataFrame): required_resource_keys={"io_manager"}, ) + # This databricks pipes asset only runs in prod, see utils/external_databricks_script.py # The dependency on predicted_orders is not a real dependency since the script does not rely # or use that upstream Snowflake table, it is used here for illustrative purposes @asset( - deps=[predicted_orders], - compute_kind="databricks", + deps=[predicted_orders], + compute_kind="databricks", ) def databricks_asset( - context: AssetExecutionContext, pipes_client: PipesDatabricksClient + context: AssetExecutionContext, + pipes_databricks_client: PipesDatabricksClient, ) -> MaterializeResult: # cluster config cluster_config = { "num_workers": 1, "spark_version": "11.2.x-scala2.12", - "node_type_id": "i3.xlarge" + "node_type_id": "i3.xlarge", } # task specification will be passed to Databricks as-is, except for the @@ -213,8 +219,31 @@ def databricks_asset( extras = {"sample_rate": 1.0} # synchronously execute the databricks job - return pipes_client.run( + return pipes_databricks_client.run( task=task, context=context, extras=extras, - ).get_materialize_result() \ No newline at end of file + ).get_materialize_result() + + +# This k8s pipes asset only runs in prod, see utils/example_container +# The dependency on predicted_orders is not a real dependency since the script does not rely +# or use that upstream Snowflake table, it is used here for illustrative purposes +@asset( + deps=[predicted_orders], + compute_kind="kubernetes", +) +def k8s_pod_asset( + context: AssetExecutionContext, + pipes_k8s_client: PipesK8sClient, +) -> MaterializeResult: + # Arbitrary json-serializable data you want access to from the `PipesSession` + # in the k8s pod container. Assume `sample_rate` is a parameter used by + # the target job's business logic. + extras = {"sample_rate": 1.0} + + return pipes_k8s_client.run( + context=context, + image=f"{CONTAINER_REGISTRY}:latest-pipes-example", + extras=extras, + ).get_materialize_result() diff --git a/hooli_data_eng/resources/__init__.py b/hooli_data_eng/resources/__init__.py index 7f809acb..e3e507af 100644 --- a/hooli_data_eng/resources/__init__.py +++ b/hooli_data_eng/resources/__init__.py @@ -5,14 +5,16 @@ from dagster_aws.s3 import ConfigurablePickledObjectS3IOManager, S3Resource from dagster_dbt import DbtCliClientResource, DbtCliResource from dagster_duckdb_pandas import DuckDBPandasIOManager +from dagster_k8s import PipesK8sClient from dagster_pyspark import pyspark_resource from dagster_snowflake_pandas import SnowflakePandasIOManager from dagstermill import ConfigurableLocalOutputNotebookIOManager from hooli_data_eng.resources.api import RawDataAPI from hooli_data_eng.resources.databricks import db_step_launcher -#from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli -#from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager + +# from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli +# from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert @@ -34,6 +36,7 @@ # The production deployment on Dagster Cloud uses production Snowflake # and S3 resources + def get_env(): if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": return "BRANCH" @@ -41,6 +44,7 @@ def get_env(): return "PROD" return "LOCAL" + client = mock.MagicMock() if get_env() == "PROD": @@ -72,7 +76,9 @@ def get_env(): "dbt": DbtCliClientResource( project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL" ), - "dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"), + "dbt2": DbtCliResource( + project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL" + ), "pyspark": pyspark_resource, "step_launcher": ResourceDefinition.none_resource(), "monitor_fs": LocalFileSystem(base_dir=file_relative_path(__file__, ".")), @@ -98,14 +104,17 @@ def get_env(): "dbt": DbtCliClientResource( project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH" ), - "dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"), + "dbt2": DbtCliResource( + project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH" + ), "pyspark": pyspark_resource, "step_launcher": db_step_launcher, "monitor_fs": s3FileSystem( region_name="us-west-2", s3_bucket="hooli-demo-branch" ), "email": ResourceDefinition.none_resource(), - "pipes_client": ResourceDefinition.none_resource(), + "pipes_databricks_client": ResourceDefinition.none_resource(), + "pipes_k8s_client": ResourceDefinition.none_resource(), }, "PROD": { "io_manager": SnowflakePandasIOManager( @@ -124,7 +133,9 @@ def get_env(): "dbt": DbtCliClientResource( project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD" ), - "dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"), + "dbt2": DbtCliResource( + project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD" + ), "pyspark": pyspark_resource, "step_launcher": db_step_launcher, "monitor_fs": s3FileSystem(region_name="us-west-2", s3_bucket="hooli-demo"), @@ -135,6 +146,7 @@ def get_env(): smtp_username=EnvVar("SMTP_USERNAME"), smtp_password=EnvVar("SMTP_PASSWORD"), ), - "pipes_client": PipesDatabricksClient(client) + "pipes_databricks_client": PipesDatabricksClient(client), + "pipes_k8s_client": PipesK8sClient(), }, -} \ No newline at end of file +} diff --git a/hooli_data_eng/utils/example_container/Dockerfile b/hooli_data_eng/utils/example_container/Dockerfile new file mode 100644 index 00000000..f286b8f7 --- /dev/null +++ b/hooli_data_eng/utils/example_container/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.10-slim + +RUN pip install dagster-pipes + +COPY script.py . + +CMD ["python", "script.py"] diff --git a/hooli_data_eng/utils/example_container/script.py b/hooli_data_eng/utils/example_container/script.py new file mode 100644 index 00000000..c832f04e --- /dev/null +++ b/hooli_data_eng/utils/example_container/script.py @@ -0,0 +1,24 @@ +#### +# This represents an "external" script that is mostly independent of dagster +# that dagster will orchestrate via pipes + +from dagster_pipes import open_dagster_pipes +import random + +with open_dagster_pipes() as context: + sample_rate = context.get_extra("sample_rate") + + # Stream log message back to Dagster + context.log.info(f"Using sample rate: {sample_rate}") + + # ... your code that computes and persists the asset + + # Stream asset materialization metadata and data version back to Dagster. + # This should be called after you've computed and stored the asset value. We + # omit the asset key here because there is only one asset in scope, but for + # multi-assets you can pass an `asset_key` parameter. + context.report_asset_materialization( + metadata={ + "some_spark_metric": random.choice(["scranton", "new york", "tallahassee"]) + }, + ) diff --git a/hooli_data_eng/utils/external_databricks_script.py b/hooli_data_eng/utils/external_databricks_script.py index b74674e9..30b8d7a3 100644 --- a/hooli_data_eng/utils/external_databricks_script.py +++ b/hooli_data_eng/utils/external_databricks_script.py @@ -1,14 +1,20 @@ #### -# This reprsents an "external" script that is mostly independent of dagster +# This represents an "external" script that is mostly independent of dagster # that dagster will orchestrate via pipes # This script must be uploaded to Databricks manually (or via some other process) # `dagster_pipes` must be available in the databricks python environment -from dagster_pipes import PipesDbfsContextLoader, PipesDbfsMessageWriter, PipesContext, open_dagster_pipes -import random +from dagster_pipes import ( + PipesDbfsContextLoader, + PipesDbfsMessageWriter, + open_dagster_pipes, +) +import random -with open_dagster_pipes(context_loader=PipesDbfsContextLoader(), message_writer=PipesDbfsMessageWriter()) as context: - +with open_dagster_pipes( + context_loader=PipesDbfsContextLoader(), + message_writer=PipesDbfsMessageWriter(), +) as context: sample_rate = context.get_extra("sample_rate") # Stream log message back to Dagster @@ -16,11 +22,12 @@ # ... your code that computes and persists the asset - # Stream asset materialization metadata and data version back to Dagster. # This should be called after you've computed and stored the asset value. We # omit the asset key here because there is only one asset in scope, but for # multi-assets you can pass an `asset_key` parameter. context.report_asset_materialization( - metadata={"some_spark_metric": random.choice(["scranton", "new york", "tallahassee"])}, - ) \ No newline at end of file + metadata={ + "some_spark_metric": random.choice(["scranton", "new york", "tallahassee"]) + }, + ) diff --git a/hooli_data_eng_tests/test_databricks_pipes.py b/hooli_data_eng_tests/test_databricks_pipes.py index 3c16242b..4a9be2dd 100644 --- a/hooli_data_eng_tests/test_databricks_pipes.py +++ b/hooli_data_eng_tests/test_databricks_pipes.py @@ -4,7 +4,7 @@ from dagster_databricks import PipesDatabricksClient # to meaningfully run this test you must set -# DAGSTER_CLOUD_DEPLOYMENT_NAME="data-eng-prod" +# DAGSTER_CLOUD_DEPLOYMENT_NAME="data-eng-prod" # and also set # DATABRICKS_HOST="your host" # DATABRICKS_TOKEN="your token" @@ -12,9 +12,9 @@ result = materialize( [databricks_asset], resources={ - "pipes_client": PipesDatabricksClient( + "pipes_databricks_client": PipesDatabricksClient( client, ) }, raise_on_error=False, -) \ No newline at end of file +) diff --git a/setup.py b/setup.py index 1c2441be..ef34d3ba 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ "scipy", "dbt-core", "dbt-duckdb", - "dbt-snowflake", + "dbt-snowflake", "dagster-duckdb", "dagster-aws", "dagster-duckdb-pandas", @@ -22,14 +22,15 @@ "dagster-cloud", "dagster-pyspark", "dagster-databricks", + "dagster-k8s", "dagstermill", "gql", "plotnine", - "responses", + "responses", "requests", "requests_toolbelt", - "html5lib", - "scikit-learn" + "html5lib", + "scikit-learn", ], extras_require={"dev": ["dagit", "pytest"]}, - ) \ No newline at end of file + )