diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index 4101e58320..c6debf5a74 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -1,4 +1,4 @@ -from typing import Sequence, cast, overload +from typing import Sequence, Type, cast, overload from dlt.common.schema import Schema from dlt.common.schema.typing import TColumnSchema, TWriteDisposition, TSchemaContract @@ -97,6 +97,7 @@ def pipeline( full_refresh: bool = False, credentials: Any = None, progress: TCollectorArg = _NULL_COLLECTOR, + _impl_cls: Type[Pipeline] = Pipeline, **kwargs: Any, ) -> Pipeline: ensure_correct_pipeline_kwargs(pipeline, **kwargs) @@ -129,7 +130,7 @@ def pipeline( progress = collector_from_name(progress) # create new pipeline instance - p = Pipeline( + p = _impl_cls( pipeline_name, pipelines_dir, pipeline_salt,