Skip to content

Commit

Permalink
make amps work better (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
slopp authored Oct 24, 2023
1 parent 59e5f4b commit f911a36
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 25 deletions.
5 changes: 0 additions & 5 deletions dbt_project/models/ANALYTICS/company_stats.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{{
config(
dagster_auto_materialize_policy={"type":"lazy"}
)
}}
select
order_date,
company,
Expand Down
5 changes: 0 additions & 5 deletions dbt_project/models/ANALYTICS/order_stats.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{{
config(
dagster_auto_materialize_policy={"type":"eager"}
)
}}
select
{{ date_trunc("day", "order_date") }} as order_date,
count(*) as n_orders,
Expand Down
5 changes: 0 additions & 5 deletions dbt_project/models/ANALYTICS/sku_stats.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{{
config(
dagster_auto_materialize_policy={"type":"eager"}
)
}}
select
order_date,
sku,
Expand Down
1 change: 0 additions & 1 deletion dbt_project/models/ANALYTICS/weekly_order_summary.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

{{
config(
dagster_auto_materialize_policy={"type":"eager"},
dagster_freshness_policy={"cron_schedule": "0 9 * * MON", "maximum_lag_minutes": (24+9)*60}
)
}}
Expand Down
5 changes: 0 additions & 5 deletions dbt_project/models/MARKETING/company_perf.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{{
config(
dagster_auto_materialize_policy={"type":"lazy"}
)
}}
select
company,
sum(n_orders) as n_orders,
Expand Down
38 changes: 35 additions & 3 deletions hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
)
from dagster_dbt.asset_decorator import dbt_assets
from dagster import (
AutoMaterializePolicy,
AutoMaterializeRule,
AssetKey,
DailyPartitionsDefinition,
WeeklyPartitionsDefinition,
Expand Down Expand Up @@ -35,6 +37,15 @@
file_relative_path(__file__, "../../dbt_project/target/manifest.json")
)

allow_outdated_parents_policy = AutoMaterializePolicy.eager().without_rules(
AutoMaterializeRule.skip_on_parent_outdated()
)

allow_outdated_and_missing_parents_policy = AutoMaterializePolicy.eager().without_rules(
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_parent_missing() # non-partitioned assets should run even if some upstream partitions are missing
)


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:
Expand All @@ -51,6 +62,9 @@ def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
if node_path == "models/sources.yml":
prefix = "RAW_DATA"

if node_path == "MARKETING/company_perf.sql":
prefix = "ANALYTICS"

return AssetKey([prefix, dbt_resource_props["name"]])

def get_group_name(self, dbt_resource_props: Mapping[str, Any]):
Expand All @@ -60,6 +74,9 @@ def get_group_name(self, dbt_resource_props: Mapping[str, Any]):
if node_path == "models/sources.yml":
prefix = "RAW_DATA"

if node_path == "MARKETING/company_perf.sql":
prefix = "ANALYTICS"

return prefix

def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
Expand All @@ -71,10 +88,24 @@ def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, An
if dbt_resource_props["name"] == "users_cleaned":
metadata = {"partition_expr": "created_at"}

if dbt_resource_props["name"] in ["company_perf", "sku_stats", "company_stats"]:
metadata = {}

default_metadata = default_metadata_from_dbt_resource_props(dbt_resource_props)

return {**default_metadata, **metadata}

def get_auto_materialize_policy(
self, dbt_resource_props: Mapping[str, Any]
):
return allow_outdated_parents_policy


class CustomDagsterDbtTranslatorForViews(CustomDagsterDbtTranslator):
def get_auto_materialize_policy(
self, dbt_resource_props: Mapping[str, Any]
):
return allow_outdated_and_missing_parents_policy

def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
# map partition key range to dbt vars
Expand Down Expand Up @@ -145,8 +176,9 @@ def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
dbt_views = load_assets_from_dbt_project(
DBT_PROJECT_DIR,
DBT_PROFILES_DIR,
key_prefix=["ANALYTICS"],
source_key_prefix="ANALYTICS",
#key_prefix=["ANALYTICS"],
#source_key_prefix="ANALYTICS",
select="company_perf sku_stats company_stats",
node_info_to_group_fn=lambda x: "ANALYTICS",
#node_info_to_group_fn=lambda x: "ANALYTICS",
dagster_dbt_translator=CustomDagsterDbtTranslatorForViews()
)
4 changes: 3 additions & 1 deletion hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
)
import pandas as pd

from hooli_data_eng.assets.dbt_assets import allow_outdated_parents_policy

# These assets take data from a SQL table managed by
# dbt and create summaries using pandas
# The assets are updated via freshness policies
# and an associated reconciliation sensor
@asset(
key_prefix="MARKETING",
freshness_policy=FreshnessPolicy(maximum_lag_minutes=24*60),
auto_materialize_policy=AutoMaterializePolicy.lazy(),
auto_materialize_policy=allow_outdated_parents_policy,
compute_kind="pandas",
op_tags={"owner": "[email protected]"},
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}
Expand Down

0 comments on commit f911a36

Please sign in to comment.