Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dbt new version #38

Merged
merged 2 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,23 @@ dagster dev

To understand the structure, start with the file `hooli_data_eng/definitions.py`. This example includes a few key Dagster concepts:

- *Assets*: are used to represent the datasets the Hooli data team manages. This example includes assets generated from dbt and Python.
- *Assets*: are used to represent the datasets the Hooli data team manages. This example includes assets generated from dbt, Python, and other sources.
- *Resources*: represent external systems. This example uses different resources for different environments (DuckDB locally, snowflake + s3 in production). The example also shows how to create custom resources, see `resources/api.py`.
- *Jobs*: allow us to automate when our assets are updated. This example includes jobs that run on a *Schedule* and *Sensors* that trigger jobs to run when upstream data is ready. Jobs can target assets, see `definitions.py` or they can define imperative operations, see `jobs/watch_s3.py`.
- *Job Configuration* allows Dagster to parameterize tasks, this example includes a forecasting model with hyper parameters passed as job config.
- *Partitions and Backfills* allow Dagster to represent partitioned data with no additional code. This example includes a partitioned model stats asset (`hooli_data_eng/assets/forecasting/__init__.py`). Using partitions allows the Hooli data team to track model performance overtime, account for model drift, and run back tests for new models.
- *Partitions and Backfills* allow Dagster to represent partitioned data with no additional code. This example shows how daily partitioned assets can automatically be scheduled daily, and how those same daily partitions can seemlessly roll up into a weekly partitioned asset.
- The asset `big_orders` in `hooli_data_eng/assets/forecasting/__init__.py` uses Spark. Locally, Spark is run through a local PySpark process. In production, a `resources/databricks.py` Databricks *Step Launcher* is used to dynamically create a Spark cluster for processing.
- The asset `model_nb` is an example of *Dagstermill* which lets you run Jupyter Notebooks as assets, including notebooks that should take upstream assets as inputs.
- *Sensors* are used to run jobs based on external events. See for example `hooli_data_eng/jobs/watch_s3.py`.
- *Declarative Scheduling* is used to keep certain marketing and analytics assets up to date based on a stakeholder SLA using freshness policies and an asset reconciliation sensor. Examples include `hooli_data_eng/assets/marketing/__init__.py` and `dbt_project/models/ANALYTICS/weekly_order_summary.sql`.
- *Declarative Scheduling* is used to keep certain marketing and analytics assets up to date based on a stakeholder SLA using freshness policies and auto materialization policies. Examples include `hooli_data_eng/assets/marketing/__init__.py` and `dbt_project/models/ANALYTICS/weekly_order_summary.sql`.
- *Retries* are enabled for both runs and assets, making the pipeline robust to occassional flakiness. See `hooli_data_eng/definitions.py` for examples of retries on jobs, and `hooli_data_eng/assets/marketing/__init__.py` for an example of a more complex retry policy on an asset including backoff and jitter. Flakiness is generated in `hooli_data_eng/resources/api.py`.
- *Alerts* are enabled through Dagster Clould alert policies based on job tags. A custom alert is also specified to notify when assets with SLAs are later than expected. See `hooli_data_eng/assets/delayed_asset_alerts.py`.

## Deployment Architecture

This repository uses Dagster Cloud Hybrid architecture with GitHub Actions to provide CI/CD.
- The main branch is deployed to Dagster Cloud using `.github/workflows/deploy.yaml`. Each commit a new Docker image is built and pushed to our container registry. These images are deployed into an EKS cluster by our running Dagster Agent which also syncronizes changes with Dagster Cloud.

- The open PR in this repository shows how Dagster supports full integration testing with a *branch deployment*, in this case the PR is code for a second "competing" model. This change also highlights how you can test dependency changes. The action `.github/workflows/branch_deploy.yaml` creates the isolated branch deployment for each PR.
- The main branch is deployed to Dagster Cloud using the workflow in `.github/workflows/`. Each commit a new Docker image is built and pushed to our container registry. These images are deployed into an EKS cluster by our running Dagster Agent which also syncronizes changes with Dagster Cloud.

- The open PR in this repository shows how Dagster supports full integration testing with a *branch deployment*, in this case the PR is code for a second "competing" model. This change also highlights how you can test dependency changes. This cxapability is also implemented in the GitHub Action in this repo.

*Dev Notes in the Repo Wiki*
2 changes: 1 addition & 1 deletion dbt_project/models/ANALYTICS/order_stats.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{{
config(
dagster_auto_materialize_policy={"type":"lazy"}
dagster_auto_materialize_policy={"type":"eager"}
)
}}
select
Expand Down
2 changes: 1 addition & 1 deletion dbt_project/models/ANALYTICS/weekly_order_summary.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

{{
config(
dagster_auto_materialize_policy={"type":"lazy"},
dagster_auto_materialize_policy={"type":"eager"},
dagster_freshness_policy={"cron_schedule": "0 9 * * MON", "maximum_lag_minutes": (24+9)*60}
)
}}
Expand Down
90 changes: 48 additions & 42 deletions hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@

from typing import Any, Mapping
from dagster._utils import file_relative_path
from dagster_dbt import DbtCliResource, DagsterDbtTranslator
from dagster_dbt import load_assets_from_dbt_project
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.core import DbtCli, DbtManifest
from dagster import AssetKey, DailyPartitionsDefinition, WeeklyPartitionsDefinition, OpExecutionContext, Output
from dagster import AssetKey, DailyPartitionsDefinition, WeeklyPartitionsDefinition, OpExecutionContext, Output, MetadataValue
from dateutil import parser
import json
import textwrap
from pathlib import Path

# many dbt assets use an incremental approach to avoid
# re-processing all data on each run
Expand All @@ -19,63 +21,64 @@
DBT_PROFILES_DIR = file_relative_path(__file__, "../../dbt_project/config")


DBT_MANIFEST = file_relative_path(__file__, "../../dbt_project/target/manifest.json")
DBT_MANIFEST = Path(file_relative_path(__file__, "../../dbt_project/target/manifest.json"))

class CustomizedDbtManifest(DbtManifest):
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:

@classmethod
def node_info_to_metadata(cls, node_info: Mapping[str, Any]) -> Mapping[str, Any]:
metadata = {"partition_expr": "order_date"}

if node_info['name'] == 'orders_cleaned':
metadata = {"partition_expr": "dt"}

if node_info['name'] == 'users_cleaned':
metadata = {"partition_expr": "created_at"}

return metadata
description = f"dbt model for: {dbt_resource_props['name']} \n \n"

@classmethod
def node_info_to_asset_key(cls, node_info: Mapping[str, Any]) -> AssetKey:
node_path = node_info['path']
return description + textwrap.indent(dbt_resource_props.get("raw_code", ""), "\t")

def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
node_path = dbt_resource_props['path']
prefix = node_path.split('/')[0]

if node_path == 'models/sources.yml':
prefix = "RAW_DATA"

return AssetKey([prefix, node_info['name']])
return AssetKey([prefix, dbt_resource_props['name']])

@classmethod
def node_info_to_description(cls, node_info: Mapping[str, Any]) -> str:
description_sections = []

description = node_info.get("description", "")
if description:
description_sections.append(description)

raw_code = node_info.get("raw_code", "")
if raw_code:
description_sections.append(f"#### Raw SQL:\n```\n{raw_code}\n```")
def get_group_name(
self, dbt_resource_props: Mapping[str, Any]
):

node_path = dbt_resource_props['path']
prefix = node_path.split('/')[0]

if node_path == 'models/sources.yml':
prefix = "RAW_DATA"

return "\n\n".join(description_sections)
return prefix


def get_metadata(
self, dbt_resource_props: Mapping[str, Any]
) -> Mapping[str, Any]:
metadata = {"partition_expr": "order_date"}

if dbt_resource_props['name'] == 'orders_cleaned':
metadata = {"partition_expr": "dt"}

if dbt_resource_props['name'] == 'users_cleaned':
metadata = {"partition_expr": "created_at"}

manifest = CustomizedDbtManifest.read(path=DBT_MANIFEST)
return metadata

def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCli):
def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
# map partition key range to dbt vars
first_partition, last_partition = context.asset_partitions_time_window_for_output(list(context.selected_output_names)[0])
dbt_vars = {"min_date": str(first_partition), "max_date": str(last_partition)}
dbt_args = ["run", "--vars", json.dumps(dbt_vars)]

dbt_cli_task = dbt2.cli(dbt_args, manifest=manifest, context=context)
dbt_cli_task = dbt2.cli(dbt_args, context=context)

dbt_events = list(dbt_cli_task.stream_raw_events())

for event in dbt_events:
# add custom metadata to the asset materialization event
context.log.info(event)
for dagster_event in event.to_default_asset_events(manifest=manifest):
for dagster_event in event.to_default_asset_events(manifest=dbt_cli_task.manifest):

if isinstance(dagster_event, Output):
event_node_info = event.raw_event["data"]["node_info"]
Expand All @@ -102,19 +105,21 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCli):


@dbt_assets(
manifest=manifest,
manifest=DBT_MANIFEST,
select="orders_cleaned users_cleaned orders_augmented",
partitions_def=daily_partitions,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCli):
def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
yield from _process_partitioned_dbt_assets(context=context, dbt2=dbt2)

@dbt_assets(
manifest=manifest,
manifest=DBT_MANIFEST,
select="weekly_order_summary order_stats",
partitions_def=weekly_partitions
partitions_def=weekly_partitions,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCli):
def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
yield from _process_partitioned_dbt_assets(context=context, dbt2=dbt2)


Expand All @@ -123,7 +128,8 @@ def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCli):
DBT_PROFILES_DIR,
key_prefix=["ANALYTICS"],
source_key_prefix="ANALYTICS",
select="company_perf sku_stats company_stats"
select="company_perf sku_stats company_stats",
node_info_to_group_fn= lambda x: "ANALYTICS"
)


Expand Down
6 changes: 4 additions & 2 deletions hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
freshness_policy=FreshnessPolicy(maximum_lag_minutes=24*60),
auto_materialize_policy=AutoMaterializePolicy.lazy(),
compute_kind="pandas",
op_tags={"owner": "[email protected]"}
op_tags={"owner": "[email protected]"},
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}
)
def avg_orders(company_perf: pd.DataFrame) -> pd.DataFrame:
""" Computes avg order KPI, must be updated regularly for exec dashboard """
Expand All @@ -26,7 +27,8 @@ def avg_orders(company_perf: pd.DataFrame) -> pd.DataFrame:
compute_kind="snowflake",
metadata={
"owner": "[email protected]"
}
},
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}
)
def min_order(context, company_perf: pd.DataFrame) -> pd.DataFrame:
""" Computes min order KPI """
Expand Down
1 change: 1 addition & 0 deletions hooli_data_eng/assets/raw_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def users(context, api: RawDataAPI) -> pd.DataFrame:
users = pd.read_json(resp.json())
all_users.append(users)


return pd.concat(all_users)


Expand Down
10 changes: 5 additions & 5 deletions hooli_data_eng/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_dbt import DbtCliClientResource
from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli
#from dagster_snowflake_pandas import SnowflakePandasIOManager
from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager
from dagster_snowflake_pandas import SnowflakePandasIOManager
#from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager
from dagster_aws.s3 import ConfigurablePickledObjectS3IOManager, S3Resource
from dagstermill import ConfigurableLocalOutputNotebookIOManager

Expand Down Expand Up @@ -235,9 +235,9 @@ def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
resources=resource_def[get_env()],
schedules=[analytics_schedule],
sensors=[
orders_sensor,
# watch_s3_sensor,
asset_delay_alert_sensor,
orders_sensor,
watch_s3_sensor,
asset_delay_alert_sensor,
],
jobs=[analytics_job, predict_job],
)
8 changes: 3 additions & 5 deletions hooli_data_eng/resources/dbt.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from dagster_dbt.core import DbtCli
from dagster_dbt.asset_decorator import DbtManifest
from dagster_dbt import DbtCliResource
from typing import List, Optional
from dagster import OpExecutionContext

class DbtCli2(DbtCli):
class DbtCli2(DbtCliResource):
profiles_dir: str

def cli(self, args: List[str],
*,
manifest: DbtManifest,
context: Optional[OpExecutionContext] = None):

args = [*args, "--profiles-dir", self.profiles_dir]

return super().cli(args=args, manifest=manifest, context=context)
return super().cli(args=args, context=context)

Loading