Skip to content

Commit

Permalink
removes pipe id, removes separate resource name, defines how resource…
Browse files Browse the repository at this point in the history
…s are cloned and added to the source after it is created
  • Loading branch information
rudolfix committed Oct 5, 2023
1 parent 2783f98 commit a9d2501
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 194 deletions.
6 changes: 2 additions & 4 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,7 @@ def _write_static_table(resource: DltResource, table_name: str) -> None:

signals.raise_if_signalled()

# TODO: many resources may be returned. if that happens the item meta must be present with table name and this name must match one of resources
# if meta contains table name
resource = source.resources.find_by_pipe(pipe_item.pipe)
resource = source.resources[pipe_item.pipe.name]
table_name: str = None
if isinstance(pipe_item.meta, TableNameMeta):
table_name = pipe_item.meta.table_name
Expand Down Expand Up @@ -187,7 +185,7 @@ def extract_with_schema(
for resource in source.resources.extracted.values():
with contextlib.suppress(DataItemRequiredForDynamicTableHints):
if resource.write_disposition == "replace":
_reset_resource_state(resource._name)
_reset_resource_state(resource.name)

extractor = extract(extract_id, source, storage, collector, max_parallel_items=max_parallel_items, workers=workers)
# iterate over all items in the pipeline and update the schema if dynamic table hints were present
Expand Down
49 changes: 29 additions & 20 deletions dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from threading import Thread
from typing import Any, ContextManager, Optional, Sequence, Union, Callable, Iterable, Iterator, List, NamedTuple, Awaitable, Tuple, Type, TYPE_CHECKING, Literal
from typing import Any, ContextManager, Dict, Optional, Sequence, Union, Callable, Iterable, Iterator, List, NamedTuple, Awaitable, Tuple, Type, TYPE_CHECKING, Literal

from dlt.common import sleep
from dlt.common.configuration import configspec
Expand Down Expand Up @@ -105,7 +105,6 @@ def __init__(self, name: str, steps: List[TPipeStep] = None, parent: "Pipe" = No
self.name = name
self._gen_idx = 0
self._steps: List[TPipeStep] = []
self._pipe_id = f"{name}_{id(self)}"
self.parent = parent
# add the steps, this will check and mod transformations
if steps:
Expand Down Expand Up @@ -407,22 +406,29 @@ def _ensure_transform_step(self, step_no: int, step: TPipeStep) -> None:
else:
raise InvalidStepFunctionArguments(self.name, callable_name, sig, str(ty_ex))

def _clone(self, keep_pipe_id: bool = True, new_name: str = None) -> "Pipe":
"""Clones the pipe steps, optionally keeping the pipe id or renaming the pipe. Used internally to clone a list of connected pipes."""
assert not (new_name and keep_pipe_id), "Cannot keep pipe id when renaming the pipe"
p = Pipe(new_name or self.name, [], self.parent)
def _clone(self, new_name: str = None, with_parent: bool = False) -> "Pipe":
"""Clones the pipe steps, optionally renaming the pipe. Used internally to clone a list of connected pipes."""
new_parent = self.parent
if with_parent and self.parent and not self.parent.is_empty:
parent_new_name = new_name
if new_name:
# if we are renaming the pipe, then also rename the parent
if self.name in self.parent.name:
parent_new_name = self.parent.name.replace(self.name, new_name)
else:
parent_new_name = f"{self.parent.name}_{new_name}"
new_parent = self.parent._clone(parent_new_name, with_parent)

p = Pipe(new_name or self.name, [], new_parent)
p._steps = self._steps.copy()
# clone shares the id with the original
if keep_pipe_id:
p._pipe_id = self._pipe_id
return p

def __repr__(self) -> str:
if self.has_parent:
bound_str = " data bound to " + repr(self.parent)
else:
bound_str = ""
return f"Pipe {self.name} ({self._pipe_id})[steps: {len(self._steps)}] at {id(self)}{bound_str}"
return f"Pipe {self.name} [steps: {len(self._steps)}] at {id(self)}{bound_str}"


class PipeIterator(Iterator[PipeItem]):
Expand Down Expand Up @@ -487,7 +493,7 @@ def from_pipes(
# print(f"max_parallel_items: {max_parallel_items} workers: {workers}")
extract = cls(max_parallel_items, workers, futures_poll_interval, next_item_mode)
# clone all pipes before iterating (recursively) as we will fork them (this add steps) and evaluate gens
pipes = PipeIterator.clone_pipes(pipes)
pipes, _ = PipeIterator.clone_pipes(pipes)


def _fork_pipeline(pipe: Pipe) -> None:
Expand Down Expand Up @@ -709,7 +715,7 @@ def _get_source_item_current(self) -> ResolvablePipeItem:
try:
# get items from last added iterator, this makes the overall Pipe as close to FIFO as possible
gen, step, pipe, meta = self._sources[-1]
# print(f"got {pipe.name} {pipe._pipe_id}")
# print(f"got {pipe.name}")
# register current pipe name during the execution of gen
set_current_pipe_name(pipe.name)
item = None
Expand Down Expand Up @@ -743,7 +749,7 @@ def _get_source_item_round_robin(self) -> ResolvablePipeItem:
if sources_count > self._initial_sources_count:
return self._get_source_item_current()
try:
# print(f"got {pipe.name} {pipe._pipe_id}")
# print(f"got {pipe.name}")
# register current pipe name during the execution of gen
item = None
while item is None:
Expand Down Expand Up @@ -775,10 +781,12 @@ def _get_source_item_round_robin(self) -> ResolvablePipeItem:
raise ResourceExtractionError(pipe.name, gen, str(ex), "generator") from ex

@staticmethod
def clone_pipes(pipes: Sequence[Pipe]) -> List[Pipe]:
def clone_pipes(pipes: Sequence[Pipe], existing_cloned_pairs: Dict[int, Pipe] = None) -> Tuple[List[Pipe], Dict[int, Pipe]]:
"""This will clone pipes and fix the parent/dependent references"""
cloned_pipes = [p._clone() for p in pipes]
cloned_pipes = [p._clone() for p in pipes if id(p) not in (existing_cloned_pairs or {})]
cloned_pairs = {id(p): c for p, c in zip(pipes, cloned_pipes)}
if existing_cloned_pairs:
cloned_pairs.update(existing_cloned_pairs)

for clone in cloned_pipes:
while True:
Expand All @@ -788,16 +796,17 @@ def clone_pipes(pipes: Sequence[Pipe]) -> List[Pipe]:
if clone.parent in cloned_pairs.values():
break
# clone if parent pipe not yet cloned
if id(clone.parent) not in cloned_pairs:
parent_id = id(clone.parent)
if parent_id not in cloned_pairs:
# print("cloning:" + clone.parent.name)
cloned_pairs[id(clone.parent)] = clone.parent._clone()
cloned_pairs[parent_id] = clone.parent._clone()
# replace with clone
# print(f"replace depends on {clone.name} to {clone.parent.name}")
clone.parent = cloned_pairs[id(clone.parent)]
# recurr with clone
clone.parent = cloned_pairs[parent_id]
# recur with clone
clone = clone.parent

return cloned_pipes
return cloned_pipes, cloned_pairs


class ManagedPipeIterator(PipeIterator):
Expand Down
Loading

0 comments on commit a9d2501

Please sign in to comment.