Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airflow): implement parallel-isolated mode #979

Merged
merged 6 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions dlt/helpers/airflow_helper.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import inspect
import os
from tempfile import gettempdir
from typing import Any, Callable, List, Literal, Optional, Sequence, Tuple

from tenacity import (
retry_if_exception,
wait_exponential,
stop_after_attempt,
Retrying,
RetryCallState,
)
from typing_extensions import get_origin

from dlt.common import pendulum
from dlt.common.exceptions import MissingDependencyException
from dlt.common.runtime.telemetry import with_telemetry
from dlt.common.typing import extract_inner_type
from dlt.extract.incremental import Incremental

try:
from airflow.configuration import conf
Expand Down Expand Up @@ -135,7 +140,7 @@ def add_run(
pipeline: Pipeline,
data: Any,
*,
decompose: Literal["none", "serialize", "parallel"] = "none",
decompose: Literal["none", "serialize", "parallel", "parallel-isolated"] = "none",
table_name: str = None,
write_disposition: TWriteDisposition = None,
loader_file_format: TLoaderFileFormat = None,
Expand Down Expand Up @@ -163,6 +168,9 @@ def add_run(
will remain sequential. Use another executor, e.g. CeleryExecutor)!
NOTE: The first component of the source is done first, after that
the rest are executed in parallel to each other.
parallel-isolated - decompose the source into a parallel Airflow task group.
NOTE: In case the SequentialExecutor is used by Airflow, the tasks
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
will remain sequential. Use another executor, e.g. CeleryExecutor)!
table_name: (str): The name of the table to which the data should be loaded within the `dataset`
write_disposition (TWriteDisposition, optional): Same as in `run` command. Defaults to None.
loader_file_format (Literal["jsonl", "insert_values", "parquet"], optional): The file format the loader will use to create the load package.
Expand Down Expand Up @@ -194,12 +202,12 @@ def task_name(pipeline: Pipeline, data: Any) -> str:
# use factory function to make a task, in order to parametrize it
# passing arguments to task function (_run) is serializing
# them and running template engine on them
def make_task(pipeline: Pipeline, data: Any) -> PythonOperator:
def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator:
def _run() -> None:
# activate pipeline
pipeline.activate()
# drop local data
task_pipeline = pipeline.drop()
task_pipeline = pipeline.drop(pipeline_name=name)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

# use task logger
if self.use_task_logger:
Expand Down Expand Up @@ -308,15 +316,53 @@ def log_after_attempt(retry_state: RetryCallState) -> None:
if pipeline.full_refresh:
raise ValueError("Cannot decompose pipelines with full_refresh set")

# parallel tasks
tasks = []
sources = data.decompose("scc")
t_name = task_name(pipeline, data)
start = make_task(pipeline, sources[0])

# parallel tasks
for source in sources[1:]:
for resource in source.resources.values():
if resource.incremental:
logger.warn(
(
f"The resource {resource.name} in task {t_name} "
"is using incremental loading and may modify the "
"state. Resources that modify the state should not "
"run in parallel within the single pipeline as the "
"state will not be correctly merged. Please use "
"'serialize' or 'parallel-isolated' modes instead."
)
)
break
tasks.append(make_task(pipeline, source))

end = DummyOperator(task_id=f"{task_name(pipeline, data)}_end")
end = DummyOperator(task_id=f"{t_name}_end")

if tasks:
start >> tasks >> end
return [start] + tasks + [end]

start >> end
return [start, end]
elif decompose == "parallel-isolated":
if not isinstance(data, DltSource):
raise ValueError("Can only decompose dlt sources")

if pipeline.full_refresh:
raise ValueError("Cannot decompose pipelines with full_refresh set")

# parallel tasks
tasks = []
t_name = task_name(pipeline, data)

start = DummyOperator(task_id=f"{t_name}_start")

for task_num, source in enumerate(data.decompose("scc"), start=1):
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
tasks.append(make_task(pipeline, source, t_name + "_" + str(task_num)))

end = DummyOperator(task_id=f"{t_name}_end")

if tasks:
start >> tasks >> end
Expand Down
10 changes: 7 additions & 3 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,17 @@ def __init__(
self.credentials = credentials
self._configure(import_schema_path, export_schema_path, must_attach_to_local_pipeline)

def drop(self) -> "Pipeline":
"""Deletes local pipeline state, schemas and any working files"""
def drop(self, pipeline_name: str = None) -> "Pipeline":
"""Deletes local pipeline state, schemas and any working files.

Args:
pipeline_name (str): Optional. New pipeline name.
"""
# reset the pipeline working dir
self._create_pipeline()
# clone the pipeline
return Pipeline(
self.pipeline_name,
pipeline_name or self.pipeline_name,
self.pipelines_dir,
self.pipeline_salt,
self.destination,
Expand Down
121 changes: 121 additions & 0 deletions tests/helpers/airflow_tests/test_airflow_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import pytest
from unittest import mock
from typing import List
from airflow import DAG
from airflow.decorators import dag
Expand Down Expand Up @@ -75,6 +76,23 @@ def resource():
return resource


@dlt.source
def mock_data_incremental_source():
@dlt.resource
def resource1(a: str = None, b=None, c=None):
yield ["s", "a"]

@dlt.resource
def resource2(
updated_at: dlt.sources.incremental[str] = dlt.sources.incremental(
"updated_at", initial_value="1970-01-01T00:00:00Z"
)
):
yield [{"updated_at": "1970-02-01T00:00:00Z"}]

return resource1, resource2


@dlt.source(section="mock_data_source_state")
def mock_data_source_state():
@dlt.resource(selected=True)
Expand Down Expand Up @@ -275,6 +293,109 @@ def dag_parallel():
assert task.upstream_task_ids == set([dag_def.tasks[0].task_id])


def test_parallel_incremental():
pipeline_standalone = dlt.pipeline(
pipeline_name="pipeline_parallel",
dataset_name="mock_data_" + uniq_id(),
destination="duckdb",
credentials=":pipeline:",
)
pipeline_standalone.run(mock_data_incremental_source())

tasks_list: List[PythonOperator] = None

quackdb_path = os.path.join(TEST_STORAGE_ROOT, "pipeline_dag_parallel.duckdb")

@dag(schedule=None, start_date=DEFAULT_DATE, catchup=False, default_args=default_args)
def dag_parallel():
nonlocal tasks_list
tasks = PipelineTasksGroup(
"pipeline_dag_parallel", local_data_folder=TEST_STORAGE_ROOT, wipe_local_data=False
)

# set duckdb to be outside of pipeline folder which is dropped on each task
pipeline_dag_parallel = dlt.pipeline(
pipeline_name="pipeline_dag_parallel",
dataset_name="mock_data_" + uniq_id(),
destination="duckdb",
credentials=quackdb_path,
)
tasks.add_run(
pipeline_dag_parallel,
mock_data_incremental_source(),
decompose="parallel",
trigger_rule="all_done",
retries=0,
provide_context=True,
)

with mock.patch("dlt.helpers.airflow_helper.logger.warn") as warn_mock:
dag_def = dag_parallel()
dag_def.test()
warn_mock.assert_called_once()


def test_parallel_isolated_run():
pipeline_standalone = dlt.pipeline(
pipeline_name="pipeline_parallel",
dataset_name="mock_data_" + uniq_id(),
destination="duckdb",
credentials=":pipeline:",
)
pipeline_standalone.run(mock_data_source())
pipeline_standalone_counts = load_table_counts(
pipeline_standalone, *[t["name"] for t in pipeline_standalone.default_schema.data_tables()]
)

tasks_list: List[PythonOperator] = None

quackdb_path = os.path.join(TEST_STORAGE_ROOT, "pipeline_dag_parallel.duckdb")

@dag(schedule=None, start_date=DEFAULT_DATE, catchup=False, default_args=default_args)
def dag_parallel():
nonlocal tasks_list
tasks = PipelineTasksGroup(
"pipeline_dag_parallel", local_data_folder=TEST_STORAGE_ROOT, wipe_local_data=False
)

# set duckdb to be outside of pipeline folder which is dropped on each task
pipeline_dag_parallel = dlt.pipeline(
pipeline_name="pipeline_dag_parallel",
dataset_name="mock_data_" + uniq_id(),
destination="duckdb",
credentials=quackdb_path,
)
tasks_list = tasks.add_run(
pipeline_dag_parallel,
mock_data_source(),
decompose="parallel-isolated",
trigger_rule="all_done",
retries=0,
provide_context=True,
)

dag_def = dag_parallel()
assert len(tasks_list) == 5
dag_def.test()

results = {}
for i in range(1, 4):
pipeline_dag_parallel = dlt.attach(
pipeline_name=f"mock_data_source__r_init-_t_init_post-_t1-_t2-2-more_{i}"
)
pipeline_dag_decomposed_counts = load_table_counts(
pipeline_dag_parallel,
*[t["name"] for t in pipeline_dag_parallel.default_schema.data_tables()],
)
results.update(pipeline_dag_decomposed_counts)

assert results == pipeline_standalone_counts

for task in dag_def.tasks[1:4]:
assert task.downstream_task_ids == set([dag_def.tasks[-1].task_id])
assert task.upstream_task_ids == set([dag_def.tasks[0].task_id])


def test_parallel_run_single_resource():
pipeline_standalone = dlt.pipeline(
pipeline_name="pipeline_parallel",
Expand Down
10 changes: 10 additions & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,16 @@ def generic(start):
assert pipeline.default_schema.get_table("single_table")["resource"] == "state1"


def test_drop_with_new_name() -> None:
old_test_name = "old_pipeline_name"
new_test_name = "new_pipeline_name"

pipeline = dlt.pipeline(pipeline_name=old_test_name, destination="duckdb")
new_pipeline = pipeline.drop(pipeline_name=new_test_name)

assert new_pipeline.pipeline_name == new_test_name


def test_remove_autodetect() -> None:
now = pendulum.now()

Expand Down
Loading