From 81bc918c9ee82ea6f2b889794a1af346d42caaf8 Mon Sep 17 00:00:00 2001 From: Sean Lopp Date: Tue, 7 Nov 2023 16:53:37 -0700 Subject: [PATCH] clean up dbt --- hooli_data_eng/assets/dbt_assets.py | 57 ++++++--------------------- hooli_data_eng/resources/__init__.py | 10 ++--- hooli_data_eng/resources/dbt.py | 2 + hooli_data_eng/resources/warehouse.py | 2 + 4 files changed, 20 insertions(+), 51 deletions(-) diff --git a/hooli_data_eng/assets/dbt_assets.py b/hooli_data_eng/assets/dbt_assets.py index 8fba7bc..b4af6b3 100644 --- a/hooli_data_eng/assets/dbt_assets.py +++ b/hooli_data_eng/assets/dbt_assets.py @@ -39,7 +39,8 @@ DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project") DBT_PROFILES_DIR = file_relative_path(__file__, "../../dbt_project/config") - +# this manifest is created at build/deploy time, see the Makefile & .github/workflows/deploy-dagster-cloud.yml#70 +# see also: https://docs.dagster.io/integrations/dbt/reference#deploying-a-dagster-project-with-a-dbt-project DBT_MANIFEST = Path( file_relative_path(__file__, "../../dbt_project/target/manifest.json") ) @@ -90,13 +91,13 @@ def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, An metadata = {"partition_expr": "order_date"} if dbt_resource_props["name"] == "orders_cleaned": - metadata = {"partition_expr": "dt"} + metadata = {"partition_expr": "dt", "owner":"data@hooli.org"} if dbt_resource_props["name"] == "users_cleaned": - metadata = {"partition_expr": "created_at"} + metadata = {"partition_expr": "created_at", "owner":"data@hooli.org"} if dbt_resource_props["name"] in ["company_perf", "sku_stats", "company_stats"]: - metadata = {} + metadata = {"owner":"bi@hooli.org"} default_metadata = default_metadata_from_dbt_resource_props(dbt_resource_props) @@ -114,7 +115,7 @@ def get_auto_materialize_policy( ): return allow_outdated_and_missing_parents_policy -def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): +def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): # map partition key range to dbt vars first_partition, last_partition = context.asset_partitions_time_window_for_output( list(context.selected_output_names)[0] @@ -122,42 +123,9 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliRes dbt_vars = {"min_date": str(first_partition), "max_date": str(last_partition)} dbt_args = ["run", "--vars", json.dumps(dbt_vars)] - dbt_cli_task = dbt2.cli(dbt_args, context=context) - - # This function adds model start and end time to derive total execution time - def handle_dbt_event(event: DbtCliEventMessage) -> Generator[Union[Output, AssetObservation, AssetCheckResult], None, None]: - for dagster_event in event.to_default_asset_events( - manifest=dbt_cli_task.manifest - ): - if isinstance(dagster_event, Output): - event_node_info = event.raw_event["data"]["node_info"] - - started_at = parser.isoparse(event_node_info["node_started_at"]) - completed_at = parser.isoparse(event_node_info["node_finished_at"]) - metadata = { - "Execution Started At": started_at.isoformat(timespec="seconds"), - "Execution Completed At": completed_at.isoformat( - timespec="seconds" - ), - "Execution Duration": (completed_at - started_at).total_seconds(), - "Owner": "data@hooli.com", - } - - context.add_output_metadata( - metadata=metadata, - output_name=dagster_event.output_name, - ) - yield dagster_event - - # This function emits an AssetObservation with the dbt model's invocation ID and unique ID (needed for Snowflake Insights) - def handle_all_dbt_events(dbt_cli_task: DbtCliInvocation) -> Generator[Union[Output, AssetObservation, AssetCheckResult], None, None]: - for raw_event in dbt_cli_task.stream_raw_events(): - yield from handle_dbt_event(raw_event) - - yield from dbt_with_snowflake_insights(context, dbt_cli_task, dagster_events=handle_all_dbt_events(dbt_cli_task)) - - if not dbt_cli_task.is_successful(): - raise Exception("dbt command failed, see preceding events") + dbt_cli_task = dbt.cli(dbt_args, context=context) + + yield from dbt_with_snowflake_insights(context, dbt_cli_task) @dbt_assets( @@ -168,7 +136,7 @@ def handle_all_dbt_events(dbt_cli_task: DbtCliInvocation) -> Generator[Union[Out backfill_policy=BackfillPolicy.single_run(), ) def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): - yield from _process_partitioned_dbt_assets(context=context, dbt2=dbt2) + yield from _process_partitioned_dbt_assets(context=context, dbt=dbt2) @dbt_assets( @@ -179,15 +147,12 @@ def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): backfill_policy=BackfillPolicy.single_run(), ) def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): - yield from _process_partitioned_dbt_assets(context=context, dbt2=dbt2) + yield from _process_partitioned_dbt_assets(context=context, dbt=dbt2) dbt_views = load_assets_from_dbt_project( DBT_PROJECT_DIR, DBT_PROFILES_DIR, - #key_prefix=["ANALYTICS"], - #source_key_prefix="ANALYTICS", select="company_perf sku_stats company_stats", - #node_info_to_group_fn=lambda x: "ANALYTICS", dagster_dbt_translator=CustomDagsterDbtTranslatorForViews() ) diff --git a/hooli_data_eng/resources/__init__.py b/hooli_data_eng/resources/__init__.py index f789d7b..7f809ac 100644 --- a/hooli_data_eng/resources/__init__.py +++ b/hooli_data_eng/resources/__init__.py @@ -3,7 +3,7 @@ from dagster import EnvVar, FilesystemIOManager, ResourceDefinition from dagster._utils import file_relative_path from dagster_aws.s3 import ConfigurablePickledObjectS3IOManager, S3Resource -from dagster_dbt import DbtCliClientResource +from dagster_dbt import DbtCliClientResource, DbtCliResource from dagster_duckdb_pandas import DuckDBPandasIOManager from dagster_pyspark import pyspark_resource from dagster_snowflake_pandas import SnowflakePandasIOManager @@ -11,7 +11,7 @@ from hooli_data_eng.resources.api import RawDataAPI from hooli_data_eng.resources.databricks import db_step_launcher -from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli +#from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli #from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert @@ -72,7 +72,7 @@ def get_env(): "dbt": DbtCliClientResource( project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL" ), - "dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"), + "dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"), "pyspark": pyspark_resource, "step_launcher": ResourceDefinition.none_resource(), "monitor_fs": LocalFileSystem(base_dir=file_relative_path(__file__, ".")), @@ -98,7 +98,7 @@ def get_env(): "dbt": DbtCliClientResource( project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH" ), - "dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"), + "dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"), "pyspark": pyspark_resource, "step_launcher": db_step_launcher, "monitor_fs": s3FileSystem( @@ -124,7 +124,7 @@ def get_env(): "dbt": DbtCliClientResource( project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD" ), - "dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"), + "dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"), "pyspark": pyspark_resource, "step_launcher": db_step_launcher, "monitor_fs": s3FileSystem(region_name="us-west-2", s3_bucket="hooli-demo"), diff --git a/hooli_data_eng/resources/dbt.py b/hooli_data_eng/resources/dbt.py index 5c5adee..2c9cc51 100644 --- a/hooli_data_eng/resources/dbt.py +++ b/hooli_data_eng/resources/dbt.py @@ -2,6 +2,8 @@ from typing import List, Optional from dagster import OpExecutionContext +# NO LONGER USED IN PROJECT, BUT EXAMPLE OF CUSTOMIZING AN INTEGRATION RESOURCE + class DbtCli2(DbtCliResource): profiles_dir: str diff --git a/hooli_data_eng/resources/warehouse.py b/hooli_data_eng/resources/warehouse.py index e0f9805..dd90d67 100644 --- a/hooli_data_eng/resources/warehouse.py +++ b/hooli_data_eng/resources/warehouse.py @@ -10,6 +10,8 @@ from dagster._core.execution.context.output import OutputContext from typing import Sequence +# NO LONGER USED IN PROJECT, BUT EXAMPLE OF CUSTOMIZING AN INTEGRATION RESOURCE + class MyDBIOManager(DbIOManager): def _get_table_slice(self, context, output_context: OutputContext):