Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] Handle sensor existing in multiple code locations #26209

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,14 @@ def get_timestamp_from_materialization(event: AssetEvent) -> float:


def synthetic_mats_for_peered_dag_asset_keys(
dag_run: DagRun, airflow_data: AirflowDefinitionsData
dag_run: DagRun,
airflow_data: AirflowDefinitionsData,
already_materialized_asset_keys: AbstractSet[AssetKey],
) -> Sequence[AssetMaterialization]:
return [
dag_synthetic_mat(dag_run, airflow_data, asset_key)
for asset_key in airflow_data.peered_dag_asset_keys_by_dag_handle[DagHandle(dag_run.dag_id)]
if asset_key not in already_materialized_asset_keys
]


Expand All @@ -119,8 +122,11 @@ def synthetic_mats_for_mapped_dag_asset_keys(
def dag_synthetic_mat(
dag_run: DagRun, airflow_data: AirflowDefinitionsData, asset_key: AssetKey
) -> AssetMaterialization:
print("entered dag_synthetic_mat")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This debug print statement can be removed - it appears to have been added during development and isn't needed in production code.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

return AssetMaterialization(
asset_key=asset_key, description=dag_run.note, metadata=get_dag_run_metadata(dag_run)
asset_key=asset_key,
description=dag_run.note,
metadata=get_dag_run_metadata(dag_run),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from dagster._time import datetime_from_timestamp, get_current_datetime

from dagster_airlift.constants import (
AIRFLOW_RUN_ID_METADATA_KEY,
AUTOMAPPED_TASK_METADATA_KEY,
DAG_RUN_ID_TAG_KEY,
EFFECTIVE_TIMESTAMP_METADATA_KEY,
Expand All @@ -49,6 +50,7 @@
synthetic_mats_for_peered_dag_asset_keys,
synthetic_mats_for_task_instance,
)
from dagster_airlift.core.serialization.serialized_data import DagHandle

MAIN_LOOP_TIMEOUT_SECONDS = DEFAULT_SENSOR_GRPC_TIMEOUT - 20
DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS = 1
Expand Down Expand Up @@ -262,8 +264,27 @@ def materializations_and_requests_from_batch_iter(
context.log.info(f"Found {len(runs)} dag runs for {airflow_data.airflow_instance.name}")
context.log.info(f"All runs {runs}")
for i, dag_run in enumerate(runs):
peered_dag_asset_keys = airflow_data.peered_dag_asset_keys_by_dag_handle[
DagHandle(dag_run.dag_id)
]
records = context.instance.get_asset_records(list(peered_dag_asset_keys))
print(f"Found {len(records)} records for {dag_run.run_id}")
for record in records:
airflow_run_id = check.not_none(
check.not_none(record.asset_entry.last_materialization).asset_materialization
).metadata[AIRFLOW_RUN_ID_METADATA_KEY].value
print(f"Airflow run id: {airflow_run_id}")

already_materialized = {
r.asset_entry.asset_key
for r in records
if check.not_none(
check.not_none(r.asset_entry.last_materialization).asset_materialization
).metadata[AIRFLOW_RUN_ID_METADATA_KEY].value
== dag_run.run_id
}
mats = build_synthetic_asset_materializations(
context, airflow_data.airflow_instance, dag_run, airflow_data
context, airflow_data.airflow_instance, dag_run, airflow_data, already_materialized
)
context.log.info(f"Found {len(mats)} materializations for {dag_run.run_id}")

Expand All @@ -284,6 +305,7 @@ def build_synthetic_asset_materializations(
airflow_instance: AirflowInstance,
dag_run: DagRun,
airflow_data: AirflowDefinitionsData,
already_materialized: Set[AssetKey],
) -> List[AssetMaterialization]:
"""In this function we need to return the asset materializations we want to synthesize
on behalf of the user.
Expand Down Expand Up @@ -318,7 +340,9 @@ def build_synthetic_asset_materializations(
)
synthetic_mats = []
# Peered dag-level materializations will always be emitted.
synthetic_mats.extend(synthetic_mats_for_peered_dag_asset_keys(dag_run, airflow_data))
synthetic_mats.extend(
synthetic_mats_for_peered_dag_asset_keys(dag_run, airflow_data, already_materialized)
)
# If there is a dagster run for this dag, we don't need to synthesize materializations for mapped dag assets.
if not dagster_runs:
synthetic_mats.extend(synthetic_mats_for_mapped_dag_asset_keys(dag_run, airflow_data))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ def print_hello() -> None:
) as second_dag:
PythonOperator(task_id="task", python_callable=print_hello)

with DAG(
"dag_shared_between_code_locations",
default_args=default_args,
schedule_interval=None,
is_paused_upon_creation=False,
) as shared_dag:
a = PythonOperator(task_id="first_task", python_callable=print_hello)
b = PythonOperator(task_id="second_task", python_callable=print_hello)
a >> b


proxying_to_dagster(
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,27 @@

from kitchen_sink.airflow_instance import local_airflow_instance

dags_to_include = {
"dag_first_code_location",
"dag_shared_between_code_locations",
}
defs = build_defs_from_airflow_instance(
airflow_instance=local_airflow_instance(),
defs=Definitions(
assets=assets_with_task_mappings(
dag_id="dag_first_code_location",
task_mappings={
"task": [AssetSpec(key="dag_first_code_location__asset")],
},
),
assets=[
*assets_with_task_mappings(
dag_id="dag_first_code_location",
task_mappings={
"task": [AssetSpec(key="dag_first_code_location__asset")],
},
),
*assets_with_task_mappings(
dag_id="dag_shared_between_code_locations",
task_mappings={
"first_task": [AssetSpec(key="dag_shared_between_code_locations__first_asset")],
},
),
],
),
dag_selector_fn=lambda dag_info: dag_info.dag_id == "dag_first_code_location",
dag_selector_fn=lambda dag_info: dag_info.dag_id in dags_to_include,
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,30 @@

from kitchen_sink.airflow_instance import local_airflow_instance

dags_to_include = {
"dag_second_code_location",
"dag_shared_between_code_locations",
}

defs = build_defs_from_airflow_instance(
airflow_instance=local_airflow_instance(),
defs=Definitions(
assets=assets_with_task_mappings(
dag_id="dag_second_code_location",
task_mappings={
"task": [AssetSpec(key="dag_second_code_location__asset")],
},
),
assets=[
*assets_with_task_mappings(
dag_id="dag_second_code_location",
task_mappings={
"task": [AssetSpec(key="dag_second_code_location__asset")],
},
),
*assets_with_task_mappings(
dag_id="dag_shared_between_code_locations",
task_mappings={
"second_task": [
AssetSpec(key="dag_shared_between_code_locations__second_asset")
],
},
),
],
),
dag_selector_fn=lambda dag_info: dag_info.dag_id == "dag_second_code_location",
dag_selector_fn=lambda dag_info: dag_info.dag_id in dags_to_include,
)