Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed May 10, 2024
1 parent dd678d0 commit 1b17ded
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 26 deletions.
31 changes: 31 additions & 0 deletions dlt/pipeline/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
_get_matching_resources,
StateInjectableContext,
Container,
pipeline_state as current_pipeline_state,
)
from dlt.common.destination.reference import WithStagingDataset

Expand All @@ -43,6 +44,7 @@
from dlt.pipeline.typing import TPipelineStep
from dlt.pipeline.drop import drop_resources
from dlt.common.configuration.exceptions import ContextDefaultCannotBeCreated
from dlt.extract import DltSource

if TYPE_CHECKING:
from dlt.pipeline import Pipeline
Expand Down Expand Up @@ -175,3 +177,32 @@ def drop(
state_only: bool = False,
) -> None:
return DropCommand(pipeline, resources, schema_name, state_paths, drop_all, state_only)()


def refresh_source(pipeline: "Pipeline", source: DltSource) -> Dict[str, Any]:
"""Run the pipeline's refresh mode on the given source, updating the source's schema and state.
Returns:
The new load package state containing tables that need to be dropped/truncated.
"""
pipeline_state, _ = current_pipeline_state(pipeline._container)
if pipeline.refresh is None or pipeline.first_run:
return {}
_resources_to_drop = (
list(source.resources.extracted) if pipeline.refresh != "drop_dataset" else []
)
drop_result = drop_resources(
source.schema,
pipeline_state,
resources=_resources_to_drop,
drop_all=pipeline.refresh == "drop_dataset",
state_paths="*" if pipeline.refresh == "drop_dataset" else [],
)
load_package_state = {}
if drop_result.dropped_tables:
key = "dropped_tables" if pipeline.refresh != "drop_data" else "truncated_tables"
load_package_state[key] = drop_result.dropped_tables
source.schema = drop_result.schema
if "sources" in drop_result.state:
pipeline_state["sources"] = drop_result.state["sources"]
return load_package_state
28 changes: 2 additions & 26 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
)
from dlt.pipeline.warnings import credentials_argument_deprecated
from dlt.common.storages.load_package import TLoadPackageState
from dlt.pipeline.drop import drop_resources
from dlt.pipeline.helpers import refresh_source


def with_state_sync(may_extract_state: bool = False) -> Callable[[TFun], TFun]:
Expand Down Expand Up @@ -1095,25 +1095,6 @@ def _wipe_working_folder(self) -> None:
def _attach_pipeline(self) -> None:
pass

def _refresh_source(self, source: DltSource) -> Tuple[Schema, TPipelineState, Dict[str, Any]]:
if self.refresh is None or self.first_run:
return source.schema, self.state, {}
_resources_to_drop = (
list(source.resources.extracted) if self.refresh != "drop_dataset" else []
)
drop_result = drop_resources(
source.schema,
self.state,
resources=_resources_to_drop,
drop_all=self.refresh == "drop_dataset",
state_paths="*" if self.refresh == "drop_dataset" else [],
)
load_package_state = {}
if drop_result.dropped_tables:
key = "dropped_tables" if self.refresh != "drop_data" else "truncated_tables"
load_package_state[key] = drop_result.dropped_tables
return drop_result.schema, drop_result.state, load_package_state

def _extract_source(
self,
extract: Extract,
Expand Down Expand Up @@ -1142,12 +1123,7 @@ def _extract_source(

load_package_state_update = dict(load_package_state_update or {})
if with_refresh:
new_schema, new_state, load_package_state = self._refresh_source(source)
load_package_state_update.update(load_package_state)
source.schema = new_schema
state, _ = current_pipeline_state(self._container)
if "sources" in new_state:
state["sources"] = new_state["sources"]
load_package_state_update.update(refresh_source(self, source))

# extract into pipeline schema
load_id = extract.extract(
Expand Down

0 comments on commit 1b17ded

Please sign in to comment.