From e4ad1075b04f1526531fca93acdf2f998b046536 Mon Sep 17 00:00:00 2001 From: Christian Minich Date: Thu, 2 May 2024 13:03:17 -0400 Subject: [PATCH] add a dummy asset with custom CLL on a srouce --- hooli_data_eng/assets/amp_test/amp_test.py | 41 +++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/hooli_data_eng/assets/amp_test/amp_test.py b/hooli_data_eng/assets/amp_test/amp_test.py index 215bc3d..a21de95 100644 --- a/hooli_data_eng/assets/amp_test/amp_test.py +++ b/hooli_data_eng/assets/amp_test/amp_test.py @@ -1,4 +1,10 @@ -from dagster import AutoMaterializePolicy, AssetExecutionContext, DataVersion, asset, observable_source_asset +from dagster import ( + AutoMaterializePolicy, AssetExecutionContext, DataVersion, asset, observable_source_asset, SourceAsset, + MaterializeResult, + TableColumnDep, + AssetKey, + TableColumnLineage, +) from dagster._utils import file_relative_path import os import pandas as pd @@ -18,6 +24,39 @@ def source_file(): return DataVersion(str(os.path.getmtime(USERS_CSV_PATH)) ) +# Define the source asset +my_source_asset = SourceAsset(key="my_source_asset") + +# Define the dependent asset +@asset( + deps=["my_source_asset"], +) +def dependent_asset() -> MaterializeResult: + return MaterializeResult( + metadata={ + "dagster/column_lineage": TableColumnLineage( + deps_by_column={ + "new_column_foo": [ + TableColumnDep( + asset_key=AssetKey("my_source_asset"), + column_name="column_bar", + ), + TableColumnDep( + asset_key=AssetKey("my_source_asset"), + column_name="column_baz", + ), + ], + "new_column_qux": [ + TableColumnDep( + asset_key=AssetKey("my_source_asset"), + column_name="column_quuz", + ), + ], + } + ) + } + ) + @asset( deps=["observable_source_asset_key"], auto_materialize_policy=AutoMaterializePolicy.eager(),