diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/event_translation.py b/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/event_translation.py index 2d03635a27052..fa3a2653ac748 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/event_translation.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/event_translation.py @@ -99,11 +99,14 @@ def get_timestamp_from_materialization(event: AssetEvent) -> float: def synthetic_mats_for_peered_dag_asset_keys( - dag_run: DagRun, airflow_data: AirflowDefinitionsData + dag_run: DagRun, + airflow_data: AirflowDefinitionsData, + already_materialized_asset_keys: AbstractSet[AssetKey], ) -> Sequence[AssetMaterialization]: return [ dag_synthetic_mat(dag_run, airflow_data, asset_key) for asset_key in airflow_data.peered_dag_asset_keys_by_dag_handle[DagHandle(dag_run.dag_id)] + if asset_key not in already_materialized_asset_keys ] @@ -119,8 +122,11 @@ def synthetic_mats_for_mapped_dag_asset_keys( def dag_synthetic_mat( dag_run: DagRun, airflow_data: AirflowDefinitionsData, asset_key: AssetKey ) -> AssetMaterialization: + print("entered dag_synthetic_mat") return AssetMaterialization( - asset_key=asset_key, description=dag_run.note, metadata=get_dag_run_metadata(dag_run) + asset_key=asset_key, + description=dag_run.note, + metadata=get_dag_run_metadata(dag_run), ) diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py b/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py index 402936ddee1c8..9f9dc5ae6e039 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py @@ -32,6 +32,7 @@ from dagster._time import datetime_from_timestamp, get_current_datetime from dagster_airlift.constants import ( + AIRFLOW_RUN_ID_METADATA_KEY, AUTOMAPPED_TASK_METADATA_KEY, DAG_RUN_ID_TAG_KEY, EFFECTIVE_TIMESTAMP_METADATA_KEY, @@ -49,6 +50,7 @@ synthetic_mats_for_peered_dag_asset_keys, synthetic_mats_for_task_instance, ) +from dagster_airlift.core.serialization.serialized_data import DagHandle MAIN_LOOP_TIMEOUT_SECONDS = DEFAULT_SENSOR_GRPC_TIMEOUT - 20 DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS = 1 @@ -262,8 +264,27 @@ def materializations_and_requests_from_batch_iter( context.log.info(f"Found {len(runs)} dag runs for {airflow_data.airflow_instance.name}") context.log.info(f"All runs {runs}") for i, dag_run in enumerate(runs): + peered_dag_asset_keys = airflow_data.peered_dag_asset_keys_by_dag_handle[ + DagHandle(dag_run.dag_id) + ] + records = context.instance.get_asset_records(list(peered_dag_asset_keys)) + print(f"Found {len(records)} records for {dag_run.run_id}") + for record in records: + airflow_run_id = check.not_none( + check.not_none(record.asset_entry.last_materialization).asset_materialization + ).metadata[AIRFLOW_RUN_ID_METADATA_KEY].value + print(f"Airflow run id: {airflow_run_id}") + + already_materialized = { + r.asset_entry.asset_key + for r in records + if check.not_none( + check.not_none(r.asset_entry.last_materialization).asset_materialization + ).metadata[AIRFLOW_RUN_ID_METADATA_KEY].value + == dag_run.run_id + } mats = build_synthetic_asset_materializations( - context, airflow_data.airflow_instance, dag_run, airflow_data + context, airflow_data.airflow_instance, dag_run, airflow_data, already_materialized ) context.log.info(f"Found {len(mats)} materializations for {dag_run.run_id}") @@ -284,6 +305,7 @@ def build_synthetic_asset_materializations( airflow_instance: AirflowInstance, dag_run: DagRun, airflow_data: AirflowDefinitionsData, + already_materialized: Set[AssetKey], ) -> List[AssetMaterialization]: """In this function we need to return the asset materializations we want to synthesize on behalf of the user. @@ -318,7 +340,9 @@ def build_synthetic_asset_materializations( ) synthetic_mats = [] # Peered dag-level materializations will always be emitted. - synthetic_mats.extend(synthetic_mats_for_peered_dag_asset_keys(dag_run, airflow_data)) + synthetic_mats.extend( + synthetic_mats_for_peered_dag_asset_keys(dag_run, airflow_data, already_materialized) + ) # If there is a dagster run for this dag, we don't need to synthesize materializations for mapped dag assets. if not dagster_runs: synthetic_mats.extend(synthetic_mats_for_mapped_dag_asset_keys(dag_run, airflow_data)) diff --git a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/airflow_dags/multi_location_dags.py b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/airflow_dags/multi_location_dags.py index d122797a9923b..e75d1757856a9 100644 --- a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/airflow_dags/multi_location_dags.py +++ b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/airflow_dags/multi_location_dags.py @@ -34,6 +34,16 @@ def print_hello() -> None: ) as second_dag: PythonOperator(task_id="task", python_callable=print_hello) +with DAG( + "dag_shared_between_code_locations", + default_args=default_args, + schedule_interval=None, + is_paused_upon_creation=False, +) as shared_dag: + a = PythonOperator(task_id="first_task", python_callable=print_hello) + b = PythonOperator(task_id="second_task", python_callable=print_hello) + a >> b + proxying_to_dagster( proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), diff --git a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_multi_code_locations/first_dag_defs.py b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_multi_code_locations/first_dag_defs.py index 50842e707859c..bdbff29259c7a 100644 --- a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_multi_code_locations/first_dag_defs.py +++ b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_multi_code_locations/first_dag_defs.py @@ -3,15 +3,27 @@ from kitchen_sink.airflow_instance import local_airflow_instance +dags_to_include = { + "dag_first_code_location", + "dag_shared_between_code_locations", +} defs = build_defs_from_airflow_instance( airflow_instance=local_airflow_instance(), defs=Definitions( - assets=assets_with_task_mappings( - dag_id="dag_first_code_location", - task_mappings={ - "task": [AssetSpec(key="dag_first_code_location__asset")], - }, - ), + assets=[ + *assets_with_task_mappings( + dag_id="dag_first_code_location", + task_mappings={ + "task": [AssetSpec(key="dag_first_code_location__asset")], + }, + ), + *assets_with_task_mappings( + dag_id="dag_shared_between_code_locations", + task_mappings={ + "first_task": [AssetSpec(key="dag_shared_between_code_locations__first_asset")], + }, + ), + ], ), - dag_selector_fn=lambda dag_info: dag_info.dag_id == "dag_first_code_location", + dag_selector_fn=lambda dag_info: dag_info.dag_id in dags_to_include, ) diff --git a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_multi_code_locations/second_dag_defs.py b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_multi_code_locations/second_dag_defs.py index aa8b12a422669..9436cb995f321 100644 --- a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_multi_code_locations/second_dag_defs.py +++ b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_multi_code_locations/second_dag_defs.py @@ -3,15 +3,30 @@ from kitchen_sink.airflow_instance import local_airflow_instance +dags_to_include = { + "dag_second_code_location", + "dag_shared_between_code_locations", +} + defs = build_defs_from_airflow_instance( airflow_instance=local_airflow_instance(), defs=Definitions( - assets=assets_with_task_mappings( - dag_id="dag_second_code_location", - task_mappings={ - "task": [AssetSpec(key="dag_second_code_location__asset")], - }, - ), + assets=[ + *assets_with_task_mappings( + dag_id="dag_second_code_location", + task_mappings={ + "task": [AssetSpec(key="dag_second_code_location__asset")], + }, + ), + *assets_with_task_mappings( + dag_id="dag_shared_between_code_locations", + task_mappings={ + "second_task": [ + AssetSpec(key="dag_shared_between_code_locations__second_asset") + ], + }, + ), + ], ), - dag_selector_fn=lambda dag_info: dag_info.dag_id == "dag_second_code_location", + dag_selector_fn=lambda dag_info: dag_info.dag_id in dags_to_include, )