Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airflow): implement parallel-isolated mode #979

Merged
merged 6 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions dlt/helpers/airflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def add_run(
pipeline: Pipeline,
data: Any,
*,
decompose: Literal["none", "serialize", "parallel"] = "none",
decompose: Literal["none", "serialize", "parallel", "parallel-isolated"] = "none",
table_name: str = None,
write_disposition: TWriteDisposition = None,
loader_file_format: TLoaderFileFormat = None,
Expand Down Expand Up @@ -163,6 +163,9 @@ def add_run(
will remain sequential. Use another executor, e.g. CeleryExecutor)!
NOTE: The first component of the source is done first, after that
the rest are executed in parallel to each other.
parallel-isolated - decompose the source into a parallel Airflow task group.
NOTE: In case the SequentialExecutor is used by Airflow, the tasks
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
will remain sequential. Use another executor, e.g. CeleryExecutor)!
table_name: (str): The name of the table to which the data should be loaded within the `dataset`
write_disposition (TWriteDisposition, optional): Same as in `run` command. Defaults to None.
loader_file_format (Literal["jsonl", "insert_values", "parquet"], optional): The file format the loader will use to create the load package.
Expand Down Expand Up @@ -308,16 +311,40 @@ def log_after_attempt(retry_state: RetryCallState) -> None:
if pipeline.full_refresh:
raise ValueError("Cannot decompose pipelines with full_refresh set")

# parallel tasks
tasks = []
sources = data.decompose("scc")
start = make_task(pipeline, sources[0])

# parallel tasks
for source in sources[1:]:
tasks.append(make_task(pipeline, source))

end = DummyOperator(task_id=f"{task_name(pipeline, data)}_end")

if tasks:
start >> tasks >> end
return [start] + tasks + [end]

start >> end
return [start, end]
elif decompose == "parallel-isolated":
if not isinstance(data, DltSource):
raise ValueError("Can only decompose dlt sources")

if pipeline.full_refresh:
raise ValueError("Cannot decompose pipelines with full_refresh set")

# parallel tasks
tasks = []
t_name = task_name(pipeline, data)

start = DummyOperator(task_id=f"{t_name}_start")

for source in data.decompose("scc"):
tasks.append(make_task(pipeline, source))

end = DummyOperator(task_id=f"{t_name}_end")

if tasks:
start >> tasks >> end
return [start] + tasks + [end]
Expand Down
54 changes: 54 additions & 0 deletions tests/helpers/airflow_tests/test_airflow_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,60 @@ def dag_parallel():
assert task.upstream_task_ids == set([dag_def.tasks[0].task_id])


def test_parallel_isolated_run():
pipeline_standalone = dlt.pipeline(
pipeline_name="pipeline_parallel",
dataset_name="mock_data_" + uniq_id(),
destination="duckdb",
credentials=":pipeline:",
)
pipeline_standalone.run(mock_data_source())
pipeline_standalone_counts = load_table_counts(
pipeline_standalone, *[t["name"] for t in pipeline_standalone.default_schema.data_tables()]
)

tasks_list: List[PythonOperator] = None

quackdb_path = os.path.join(TEST_STORAGE_ROOT, "pipeline_dag_parallel.duckdb")

@dag(schedule=None, start_date=DEFAULT_DATE, catchup=False, default_args=default_args)
def dag_parallel():
nonlocal tasks_list
tasks = PipelineTasksGroup(
"pipeline_dag_parallel", local_data_folder=TEST_STORAGE_ROOT, wipe_local_data=False
)

# set duckdb to be outside of pipeline folder which is dropped on each task
pipeline_dag_parallel = dlt.pipeline(
pipeline_name="pipeline_dag_parallel",
dataset_name="mock_data_" + uniq_id(),
destination="duckdb",
credentials=quackdb_path,
)
tasks_list = tasks.add_run(
pipeline_dag_parallel,
mock_data_source(),
decompose="parallel-isolated",
trigger_rule="all_done",
retries=0,
provide_context=True,
)

dag_def = dag_parallel()
assert len(tasks_list) == 5
dag_def.test()
pipeline_dag_parallel = dlt.attach(pipeline_name="pipeline_dag_parallel")
pipeline_dag_decomposed_counts = load_table_counts(
pipeline_dag_parallel,
*[t["name"] for t in pipeline_dag_parallel.default_schema.data_tables()],
)
assert pipeline_dag_decomposed_counts == pipeline_standalone_counts

for task in dag_def.tasks[1:4]:
assert task.downstream_task_ids == set([dag_def.tasks[-1].task_id])
assert task.upstream_task_ids == set([dag_def.tasks[0].task_id])


def test_parallel_run_single_resource():
pipeline_standalone = dlt.pipeline(
pipeline_name="pipeline_parallel",
Expand Down
Loading