Skip to content

Commit

Permalink
moved jobs, resources, schedules, and sensor logic to their respectiv…
Browse files Browse the repository at this point in the history
…e directories
  • Loading branch information
izzye84 committed Sep 26, 2023
1 parent d9ffbda commit 7441e50
Showing 1 changed file with 10 additions and 180 deletions.
190 changes: 10 additions & 180 deletions hooli_data_eng/definitions.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,20 @@
import os

from dagster_pyspark import pyspark_resource

from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets
from hooli_data_eng.assets.delayed_asset_alerts import asset_delay_alert_sensor
from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem
from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert
from hooli_data_eng.resources.databricks import db_step_launcher
from hooli_data_eng.resources.api import RawDataAPI
from hooli_data_eng.jobs.watch_s3 import watch_s3_sensor
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_dbt import DbtCliClientResource
from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli
from dagster_snowflake_pandas import SnowflakePandasIOManager
#from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager
from dagster_aws.s3 import ConfigurablePickledObjectS3IOManager, S3Resource
from dagstermill import ConfigurableLocalOutputNotebookIOManager


from dagster import (
build_schedule_from_partitioned_job,
AssetSelection,
Definitions,
EnvVar,
EventLogEntry,
RunRequest,
SensorEvaluationContext,
ResourceDefinition,
asset_sensor,
define_asset_job,
FilesystemIOManager,
load_assets_from_modules,
load_assets_from_package_module,
AssetKey,
AutoMaterializePolicy,
multiprocess_executor,
)

from dagster._utils import file_relative_path
from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets
from hooli_data_eng.assets.marketing import check_avg_orders
from hooli_data_eng.assets.raw_data import check_users
from hooli_data_eng.jobs import analytics_job, predict_job
from hooli_data_eng.resources import get_env, resource_def
from hooli_data_eng.schedules import analytics_schedule
from hooli_data_eng.sensors import orders_sensor
from hooli_data_eng.sensors.delayed_asset_alerts import asset_delay_alert_sensor
from hooli_data_eng.sensors.watch_s3 import watch_s3_sensor

# ---------------------------------------------------
# Assets
Expand All @@ -62,10 +39,6 @@
# specifies what databases to targets, and locally will
# execute against a DuckDB


DBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project")
DBT_PROFILES_DIR = file_relative_path(__file__, "../dbt_project/config")

dbt_assets = load_assets_from_modules([dbt_assets])

# Our final set of assets represent Python code that
Expand All @@ -77,150 +50,6 @@

marketing_assets = load_assets_from_package_module(marketing, group_name="MARKETING")

# ---------------------------------------------------
# Resources

# Resources represent external systems and, and specifically IO Managers
# tell dagster where our assets should be materialized. In dagster
# resources are separate from logical code to make it possible
# to develop locally, run tests, and run integration tests
#
# This project is designed for everything to run locally
# using the file system and DuckDB as the primary development resources
#
# PRs use a "branch" environment that mirrors production with
# staging Snowflake and S3 resources
#
# The production deployment on Dagster Cloud uses production Snowflake
# and S3 resources


def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"


# Similar to having different dbt targets, here we create the resource
# configuration by environment

resource_def = {
"LOCAL": {
"io_manager": DuckDBPandasIOManager(
database=os.path.join(DBT_PROJECT_DIR, "example.duckdb")
),
"model_io_manager": FilesystemIOManager(),
"output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
"api": RawDataAPI.configure_at_launch(),
"s3": ResourceDefinition.none_resource(),
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"
),
"dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"),
"pyspark": pyspark_resource,
"step_launcher": ResourceDefinition.none_resource(),
"monitor_fs": LocalFileSystem(base_dir=file_relative_path(__file__, ".")),
"email": LocalEmailAlert(
smtp_email_to=["[email protected]"], smtp_email_from="[email protected]"
),
},
"BRANCH": {
"io_manager": SnowflakePandasIOManager(
database="DEMO_DB2_BRANCH",
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
warehouse="TINY_WAREHOUSE",
),
"model_io_manager": ConfigurablePickledObjectS3IOManager(
s3_bucket="hooli-demo-branch",
s3_resource=S3Resource(region_name="us-west-2"),
),
"output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
"api": RawDataAPI.configure_at_launch(),
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"
),
"dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"),
"pyspark": pyspark_resource,
"step_launcher": db_step_launcher,
"monitor_fs": s3FileSystem(
region_name="us-west-2", s3_bucket="hooli-demo-branch"
),
"email": ResourceDefinition.none_resource(),
},
"PROD": {
"io_manager": SnowflakePandasIOManager(
database="DEMO_DB2",
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
warehouse="TINY_WAREHOUSE",
),
"model_io_manager": ConfigurablePickledObjectS3IOManager(
s3_bucket="hooli-demo-branch",
s3_resource=S3Resource(region_name="us-west-2"),
),
"output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
"api": RawDataAPI.configure_at_launch(),
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"
),
"dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"),
"pyspark": pyspark_resource,
"step_launcher": db_step_launcher,
"monitor_fs": s3FileSystem(region_name="us-west-2", s3_bucket="hooli-demo"),
"email": SESEmailAlert(
smtp_host="email-smtp.us-west-2.amazonaws.com",
smtp_email_from="[email protected]",
smtp_email_to=["[email protected]"],
smtp_username=EnvVar("SMTP_USERNAME"),
smtp_password=EnvVar("SMTP_PASSWORD"),
),
},
}

# ---------------------------------------------------
# Jobs and Sensors

# With assets defined we have everything to run Dagster
# ourselves if we wanted to manually create assets.
# Most of the time you will want to automate asset creation.
# In dagster, jobs allow you to update all or some assets.
# Jobs can be run on a schedule, or in response to an external
# event using a sensor.

# This job updates all of the assets upstream of "orders_augmented",
# which is an asset representing a model in dbt
analytics_job = define_asset_job(
name="refresh_analytics_model_job",
selection=AssetSelection.keys(["ANALYTICS", "orders_augmented"]).upstream(),
tags={"dagster/max_retries": "1"},
# config = {"execution": {"config": {"multiprocess": {"max_concurrent": 1}}}}
)

# This schedule tells dagster to run the analytics_job daily
analytics_schedule = build_schedule_from_partitioned_job(analytics_job)

# This job selects the predicted_orders asset defined in
# assets/forecasting/__init__.py
predict_job = define_asset_job(
"predict_job",
selection=AssetSelection.keys(["FORECASTING", "predicted_orders"]),
tags={"alert_team": "ml"},
)


# This sensor listens for changes to the orders_augmented asset which
# represents a dbt model. When the table managed by dbt is updated,
# this sensor will trigger the predict_job above, ensuring that anytime
# new order data is produced the forecast is updated
@asset_sensor(asset_key=AssetKey(["ANALYTICS", "orders_augmented"]), job=predict_job)
def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
yield RunRequest(run_key=context.cursor)

# ---------------------------------------------------
# Definitions

Expand All @@ -232,6 +61,7 @@ def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
{"max_concurrent": 3}
),
assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets],
asset_checks=[check_users, check_avg_orders],
resources=resource_def[get_env()],
schedules=[analytics_schedule],
sensors=[
Expand Down

0 comments on commit 7441e50

Please sign in to comment.