Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean up dbt #51

Merged
merged 1 commit into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 11 additions & 46 deletions hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project")
DBT_PROFILES_DIR = file_relative_path(__file__, "../../dbt_project/config")


# this manifest is created at build/deploy time, see the Makefile & .github/workflows/deploy-dagster-cloud.yml#70
# see also: https://docs.dagster.io/integrations/dbt/reference#deploying-a-dagster-project-with-a-dbt-project
DBT_MANIFEST = Path(
file_relative_path(__file__, "../../dbt_project/target/manifest.json")
)
Expand Down Expand Up @@ -90,13 +91,13 @@ def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, An
metadata = {"partition_expr": "order_date"}

if dbt_resource_props["name"] == "orders_cleaned":
metadata = {"partition_expr": "dt"}
metadata = {"partition_expr": "dt", "owner":"[email protected]"}

if dbt_resource_props["name"] == "users_cleaned":
metadata = {"partition_expr": "created_at"}
metadata = {"partition_expr": "created_at", "owner":"[email protected]"}

if dbt_resource_props["name"] in ["company_perf", "sku_stats", "company_stats"]:
metadata = {}
metadata = {"owner":"[email protected]"}

default_metadata = default_metadata_from_dbt_resource_props(dbt_resource_props)

Expand All @@ -114,50 +115,17 @@ def get_auto_materialize_policy(
):
return allow_outdated_and_missing_parents_policy

def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
# map partition key range to dbt vars
first_partition, last_partition = context.asset_partitions_time_window_for_output(
list(context.selected_output_names)[0]
)
dbt_vars = {"min_date": str(first_partition), "max_date": str(last_partition)}
dbt_args = ["run", "--vars", json.dumps(dbt_vars)]

dbt_cli_task = dbt2.cli(dbt_args, context=context)

# This function adds model start and end time to derive total execution time
def handle_dbt_event(event: DbtCliEventMessage) -> Generator[Union[Output, AssetObservation, AssetCheckResult], None, None]:
for dagster_event in event.to_default_asset_events(
manifest=dbt_cli_task.manifest
):
if isinstance(dagster_event, Output):
event_node_info = event.raw_event["data"]["node_info"]

started_at = parser.isoparse(event_node_info["node_started_at"])
completed_at = parser.isoparse(event_node_info["node_finished_at"])
metadata = {
"Execution Started At": started_at.isoformat(timespec="seconds"),
"Execution Completed At": completed_at.isoformat(
timespec="seconds"
),
"Execution Duration": (completed_at - started_at).total_seconds(),
"Owner": "[email protected]",
}

context.add_output_metadata(
metadata=metadata,
output_name=dagster_event.output_name,
)
yield dagster_event

# This function emits an AssetObservation with the dbt model's invocation ID and unique ID (needed for Snowflake Insights)
def handle_all_dbt_events(dbt_cli_task: DbtCliInvocation) -> Generator[Union[Output, AssetObservation, AssetCheckResult], None, None]:
for raw_event in dbt_cli_task.stream_raw_events():
yield from handle_dbt_event(raw_event)

yield from dbt_with_snowflake_insights(context, dbt_cli_task, dagster_events=handle_all_dbt_events(dbt_cli_task))

if not dbt_cli_task.is_successful():
raise Exception("dbt command failed, see preceding events")
dbt_cli_task = dbt.cli(dbt_args, context=context)

yield from dbt_with_snowflake_insights(context, dbt_cli_task)


@dbt_assets(
Expand All @@ -168,7 +136,7 @@ def handle_all_dbt_events(dbt_cli_task: DbtCliInvocation) -> Generator[Union[Out
backfill_policy=BackfillPolicy.single_run(),
)
def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
yield from _process_partitioned_dbt_assets(context=context, dbt2=dbt2)
yield from _process_partitioned_dbt_assets(context=context, dbt=dbt2)


@dbt_assets(
Expand All @@ -179,15 +147,12 @@ def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
backfill_policy=BackfillPolicy.single_run(),
)
def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
yield from _process_partitioned_dbt_assets(context=context, dbt2=dbt2)
yield from _process_partitioned_dbt_assets(context=context, dbt=dbt2)


dbt_views = load_assets_from_dbt_project(
DBT_PROJECT_DIR,
DBT_PROFILES_DIR,
#key_prefix=["ANALYTICS"],
#source_key_prefix="ANALYTICS",
select="company_perf sku_stats company_stats",
#node_info_to_group_fn=lambda x: "ANALYTICS",
dagster_dbt_translator=CustomDagsterDbtTranslatorForViews()
)
10 changes: 5 additions & 5 deletions hooli_data_eng/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
from dagster import EnvVar, FilesystemIOManager, ResourceDefinition
from dagster._utils import file_relative_path
from dagster_aws.s3 import ConfigurablePickledObjectS3IOManager, S3Resource
from dagster_dbt import DbtCliClientResource
from dagster_dbt import DbtCliClientResource, DbtCliResource
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster_pyspark import pyspark_resource
from dagster_snowflake_pandas import SnowflakePandasIOManager
from dagstermill import ConfigurableLocalOutputNotebookIOManager

from hooli_data_eng.resources.api import RawDataAPI
from hooli_data_eng.resources.databricks import db_step_launcher
from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli
#from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli
#from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager
from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem
from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert
Expand Down Expand Up @@ -72,7 +72,7 @@ def get_env():
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"
),
"dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"),
"dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"),
"pyspark": pyspark_resource,
"step_launcher": ResourceDefinition.none_resource(),
"monitor_fs": LocalFileSystem(base_dir=file_relative_path(__file__, ".")),
Expand All @@ -98,7 +98,7 @@ def get_env():
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"
),
"dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"),
"dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"),
"pyspark": pyspark_resource,
"step_launcher": db_step_launcher,
"monitor_fs": s3FileSystem(
Expand All @@ -124,7 +124,7 @@ def get_env():
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"
),
"dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"),
"dbt2": DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"),
"pyspark": pyspark_resource,
"step_launcher": db_step_launcher,
"monitor_fs": s3FileSystem(region_name="us-west-2", s3_bucket="hooli-demo"),
Expand Down
2 changes: 2 additions & 0 deletions hooli_data_eng/resources/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from typing import List, Optional
from dagster import OpExecutionContext

# NO LONGER USED IN PROJECT, BUT EXAMPLE OF CUSTOMIZING AN INTEGRATION RESOURCE

class DbtCli2(DbtCliResource):
profiles_dir: str

Expand Down
2 changes: 2 additions & 0 deletions hooli_data_eng/resources/warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from dagster._core.execution.context.output import OutputContext
from typing import Sequence

# NO LONGER USED IN PROJECT, BUT EXAMPLE OF CUSTOMIZING AN INTEGRATION RESOURCE

class MyDBIOManager(DbIOManager):

def _get_table_slice(self, context, output_context: OutputContext):
Expand Down
Loading