Skip to content

Commit

Permalink
Fix Forecasting Section (#45)
Browse files Browse the repository at this point in the history
* add dagster pipes example with databricks

* fix notebook
  • Loading branch information
slopp authored Oct 23, 2023
1 parent e27ec16 commit db58d9b
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 66 deletions.
161 changes: 120 additions & 41 deletions hooli_data_eng/assets/forecasting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,86 +3,115 @@
import numpy as np
import pandas as pd
from scipy import optimize
from dagster_dbt import DbtCliResource

from dagster import AssetIn, asset, MonthlyPartitionsDefinition, Output, Field, Int, Config
from dagster import (
AssetIn,
asset,
MonthlyPartitionsDefinition,
Output,
Field,
Int,
Config,
AssetExecutionContext,
)
from dagstermill import define_dagstermill_asset
from dagster._utils import file_relative_path

from dagster_databricks import PipesDatabricksClient
from databricks.sdk.service import jobs
from pydantic import Field


def model_func(x, a, b):
return a * np.exp(b * (x / 10**18 - 1.6095))

# ----- Forecasting Assets -----

# ----- Forecasting Assets -----
# These assets live downstream of tables created by dbt
# which are referenced by the key_prefix 'analytics',
# but otherwise can be referenced as data frames --
# the IO from the warehouse to these pandas assets are handled
# which are referenced by the key_prefix 'analytics',
# but otherwise can be referenced as data frames --
# the IO from the warehouse to these pandas assets are handled
# by the IO Managers

# This asset trains a model and stores the model coefficients as
# a tuple
# The hyper-parameters for the model (a_init and b_init) are
# available as parameters that can be changed in Dagster's
# This asset trains a model and stores the model coefficients as
# a tuple
# The hyper-parameters for the model (a_init and b_init) are
# available as parameters that can be changed in Dagster's
# Launchpad (see config_schema)
# The final model coefficients are logged to Dagster
# The final model coefficients are logged to Dagster
# using context.log.info


class modelHyperParams(Config):
""" Hyper parameters for the ML model with default values """
"""Hyper parameters for the ML model with default values"""

a_init = Field(5, description="initial value for a parameter, intercept")
b_init = Field(5, description="initial value for b parameter, slope")


@asset(
ins={"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"])},
compute_kind="scikitlearn",
io_manager_key="model_io_manager",
)
def order_forecast_model(context, weekly_order_summary: pd.DataFrame, config: modelHyperParams) -> Any:

def order_forecast_model(
context, weekly_order_summary: pd.DataFrame, config: modelHyperParams
) -> Any:
"""Model parameters that best fit the observed data"""
df = weekly_order_summary
p0 = [config.a_init, config.b_init]
coeffs = tuple(
optimize.curve_fit(
f=model_func, xdata=df.order_date.astype(np.int64), ydata=df.num_orders, p0=p0
f=model_func,
xdata=df.order_date.astype(np.int64),
ydata=df.num_orders,
p0=p0,
)[0]
)
context.log.info("Starting with: " + str(p0[0]) + " and " + str(p0[1]))
context.log.info("Ended with: " + str(coeffs[0]) + " and " + str(coeffs[1]))
return coeffs

# This asset uses the data from the warehouse and the trained model

# This asset uses the data from the warehouse and the trained model
# coefficients to track model error by month
# The monthly error is modelled as a partioned asset to enable
# easy backfills if the error statistic or upstream model change
# Helpful information is surfaced in dagster using the Output(... metadata)
@asset(
ins={
ins={
"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"]),
"order_forecast_model": AssetIn(),
},
compute_kind="scikitlearn",
key_prefix=["forecasting"],
io_manager_key="model_io_manager",
partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01")
partitions_def=MonthlyPartitionsDefinition(start_date="2022-01-01"),
)
def model_stats_by_month(context, weekly_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]) -> Output[pd.DataFrame]:
def model_stats_by_month(
context,
weekly_order_summary: pd.DataFrame,
order_forecast_model: Tuple[float, float],
) -> Output[pd.DataFrame]:
"""Model errors by month"""
a, b = order_forecast_model
target_date = pd.to_datetime(context.asset_partition_key_for_output())
target_month = target_date.month
weekly_order_summary['order_date'] = pd.to_datetime(weekly_order_summary['order_date'])
weekly_order_summary['order_month'] = pd.DatetimeIndex(weekly_order_summary['order_date']).month
target_orders = weekly_order_summary[(weekly_order_summary['order_month'] == target_month)]
date_range = pd.date_range(
weekly_order_summary["order_date"] = pd.to_datetime(
weekly_order_summary["order_date"]
)
weekly_order_summary["order_month"] = pd.DatetimeIndex(
weekly_order_summary["order_date"]
).month
target_orders = weekly_order_summary[
(weekly_order_summary["order_month"] == target_month)
]
date_range = pd.date_range(
start=target_date, end=target_date + pd.DateOffset(days=30)
)
predicted_orders = model_func(x = date_range.astype(np.int64), a=a, b=b)
error = sum(target_orders['num_orders']) - sum(predicted_orders)
predicted_orders = model_func(x=date_range.astype(np.int64), a=a, b=b)
error = sum(target_orders["num_orders"]) - sum(predicted_orders)
context.log.info("Error for " + str(target_date) + ": " + str(error))

return Output(pd.DataFrame({"error": [error]}), metadata={"error_obs_prds": error})


Expand All @@ -108,33 +137,83 @@ def predicted_orders(
predicted_data = model_func(x=future_dates.astype(np.int64), a=a, b=b)
return pd.DataFrame({"order_date": future_dates, "num_orders": predicted_data})

# This asset uses the forecasted orders to flag any days that
# surpass available capacity
# The asset uses spark which requires a pyspark resource

# This asset uses the forecasted orders to flag any days that
# surpass available capacity
# The asset uses spark which requires a pyspark resource
# and a step launcher
# Locally the pyspark session runs in a local spark context
# In branch and production, the step launcher is responsible
# for building a databricks cluster
# for building a databricks cluster
@asset(
ins={"predicted_orders": AssetIn(key_prefix=["FORECASTING"])},
compute_kind="pyspark",
key_prefix=["FORECASTING"],
required_resource_keys={"step_launcher", "pyspark"},
metadata = {"resource_constrained_at": 50}
required_resource_keys={"step_launcher", "pyspark"},
metadata={"resource_constrained_at": 50},
)
def big_orders(context,predicted_orders: pd.DataFrame):
def big_orders(context, predicted_orders: pd.DataFrame):
"""Days where predicted orders surpass our current carrying capacity"""
df = context.resources.pyspark.spark_session.createDataFrame(predicted_orders)
return df.where(df.num_orders >= 50).toPandas()


# This asset uses a Jupyter Notebook which takes inputs from the warehouse
# and the trained model coefficients
model_nb = define_dagstermill_asset(
name = "model_nb",
notebook_path = file_relative_path(__file__, "model.ipynb"),
ins = {
"weekly_order_summary": AssetIn(key_prefix=["ANALYTICS"], dagster_type = pd.DataFrame),
"order_forecast_model": AssetIn(),
name="model_nb",
notebook_path=file_relative_path(__file__, "model.ipynb"),
ins={
"weekly_order_summary": AssetIn(
key_prefix=["ANALYTICS"], dagster_type=pd.DataFrame
),
"order_forecast_model": AssetIn(),
},
required_resource_keys = {"io_manager"}
)
required_resource_keys={"io_manager"},
)

# This databricks pipes asset only runs in prod, see utils/external_databricks_script.py
# The dependency on predicted_orders is not a real dependency since the script does not rely
# or use that upstream Snowflake table, it is used here for illustrative purposes
@asset(
deps=[predicted_orders],
compute_kind="databricks"
)
def databricks_asset(
context: AssetExecutionContext, pipes_client: PipesDatabricksClient
):
# cluster config
cluster_config = {
"num_workers": 1,
"spark_version": "11.2.x-scala2.12",
"node_type_id": "i3.xlarge"
}

# task specification will be passed to Databricks as-is, except for the
# injection of environment variables
task = jobs.SubmitTask.from_dict(
{
"new_cluster": cluster_config,
"libraries": [
# must include dagster-pipes
{"pypi": {"package": "dagster-pipes"}},
],
"task_key": "dagster-launched",
"spark_python_task": {
"python_file": "dbfs:/FileStore/external_databricks_script.py",
"source": jobs.Source.WORKSPACE,
},
}
)

# Arbitrary json-serializable data you want access to from the `PipesSession`
# in the Databricks runtime. Assume `sample_rate` is a parameter used by
# the target job's business logic.
extras = {"sample_rate": 1.0}

# synchronously execute the databricks job
return pipes_client.run(
task=task,
context=context,
extras=extras,
).get_results()
42 changes: 17 additions & 25 deletions hooli_data_eng/assets/forecasting/model.ipynb

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions hooli_data_eng/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem
from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert

from databricks.sdk import WorkspaceClient
from dagster_databricks import PipesDatabricksClient
from unittest import mock

# Resources represent external systems and, and specifically IO Managers
# tell dagster where our assets should be materialized. In dagster
Expand All @@ -38,6 +41,14 @@ def get_env():
return "PROD"
return "LOCAL"

client = mock.MagicMock()

if get_env() == "PROD":
# Databricks Client
client = WorkspaceClient(
host=os.environ["DATABRICKS_HOST"],
token=os.environ["DATABRICKS_TOKEN"],
)

# The dbt file dbt_project/config/profiles.yaml
# specifies what databases to targets, and locally will
Expand Down Expand Up @@ -68,6 +79,7 @@ def get_env():
"email": LocalEmailAlert(
smtp_email_to=["[email protected]"], smtp_email_from="[email protected]"
),
"pipes_client": ResourceDefinition.none_resource(),
},
"BRANCH": {
"io_manager": SnowflakePandasIOManager(
Expand All @@ -93,6 +105,7 @@ def get_env():
region_name="us-west-2", s3_bucket="hooli-demo-branch"
),
"email": ResourceDefinition.none_resource(),
"pipes_client": ResourceDefinition.none_resource(),
},
"PROD": {
"io_manager": SnowflakePandasIOManager(
Expand Down Expand Up @@ -122,5 +135,6 @@ def get_env():
smtp_username=EnvVar("SMTP_USERNAME"),
smtp_password=EnvVar("SMTP_PASSWORD"),
),
"pipes_client": PipesDatabricksClient(client)
},
}
26 changes: 26 additions & 0 deletions hooli_data_eng/utils/external_databricks_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
####
# This reprsents an "external" script that is mostly independent of dagster
# that dagster will orchestrate via pipes
# This script must be uploaded to Databricks manually (or via some other process)
# `dagster_pipes` must be available in the databricks python environment

from dagster_pipes import PipesDbfsContextLoader, PipesDbfsMessageWriter, PipesContext, open_dagster_pipes
import random

with open_dagster_pipes(context_loader=PipesDbfsContextLoader(), message_writer=PipesDbfsMessageWriter()) as context:

sample_rate = context.get_extra("sample_rate")

# Stream log message back to Dagster
context.log.info(f"Using sample rate: {sample_rate}")

# ... your code that computes and persists the asset


# Stream asset materialization metadata and data version back to Dagster.
# This should be called after you've computed and stored the asset value. We
# omit the asset key here because there is only one asset in scope, but for
# multi-assets you can pass an `asset_key` parameter.
context.report_asset_materialization(
metadata={"some_spark_metric": random.choice(["scranton", "new york", "tallahassee"])},
)
20 changes: 20 additions & 0 deletions hooli_data_eng_tests/test_databricks_pipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dagster import materialize
from hooli_data_eng.assets.forecasting import databricks_asset
from hooli_data_eng.resources import client
from dagster_databricks import PipesDatabricksClient

# to meaningfully run this test you must set
# DAGSTER_CLOUD_DEPLOYMENT_NAME="data-eng-prod"
# and also set
# DATABRICKS_HOST="your host"
# DATABRICKS_TOKEN="your token"

result = materialize(
[databricks_asset],
resources={
"pipes_client": PipesDatabricksClient(
client,
)
},
raise_on_error=False,
)

0 comments on commit db58d9b

Please sign in to comment.