Skip to content

Commit

Permalink
add load id to loadpackage info in current
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Mar 4, 2024
1 parent e60f2f1 commit 3229745
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 8 deletions.
20 changes: 14 additions & 6 deletions dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Any,
Tuple,
TYPE_CHECKING,
TypedDict,
)

from dlt.common import pendulum, json
Expand Down Expand Up @@ -58,6 +59,13 @@ class TLoadPackageState(TVersionedState, total=False):
"""private space for destinations to store state relevant only to the load package"""


class TLoadPackage(TypedDict, total=False):
load_id: str
"""Load id"""
state: TLoadPackageState
"""State of the load package"""


# allows to upgrade state when restored with a new version of state logic/schema
LOADPACKAGE_STATE_ENGINE_VERSION = 1

Expand Down Expand Up @@ -632,7 +640,7 @@ def on_resolved(self) -> None:
def __init__(self, load_id: str, storage: PackageStorage) -> None: ...


def load_package_state() -> TLoadPackageState:
def load_package() -> TLoadPackage:
"""Get full load package state present in current context. Across all threads this will be the same in memory dict."""
container = Container()
# get injected state if present. injected load package state is typically "managed" so changes will be persisted
Expand All @@ -641,7 +649,7 @@ def load_package_state() -> TLoadPackageState:
state_ctx = container[LoadPackageStateInjectableContext]
except ContextDefaultCannotBeCreated:
raise Exception("Load package state not available")
return state_ctx.state
return TLoadPackage(state=state_ctx.state, load_id=state_ctx.load_id)


def commit_load_package_state() -> None:
Expand All @@ -656,13 +664,13 @@ def commit_load_package_state() -> None:

def destination_state() -> DictStrAny:
"""Get segment of load package state that is specific to the current destination."""
lp_state = load_package_state()
return lp_state.setdefault("destination_state", {})
lp = load_package()
return lp["state"].setdefault("destination_state", {})


def clear_destination_state(commit: bool = True) -> None:
"""Clear segment of load package state that is specific to the current destination. Optionally commit to load package."""
lp_state = load_package_state()
lp_state.pop("destination_state", None)
lp = load_package()
lp["state"].pop("destination_state", None)
if commit:
commit_load_package_state()
2 changes: 2 additions & 0 deletions dlt/destinations/impl/sink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

def capabilities(
preferred_loader_file_format: TLoaderFileFormat = "puae-jsonl",
naming_convention: str = "direct",
) -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext.generic_capabilities(preferred_loader_file_format)
caps.supported_loader_file_formats = ["puae-jsonl", "parquet"]
caps.supports_ddl_transactions = False
caps.supports_transactions = False
caps.naming_convention = naming_convention
return caps
1 change: 0 additions & 1 deletion dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from functools import wraps



import dlt
from dlt.common.exceptions import MissingDependencyException
from dlt.common import pendulum, logger
Expand Down
2 changes: 1 addition & 1 deletion dlt/pipeline/current.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dlt.pipeline import pipeline as _pipeline
from dlt.extract.decorators import get_source_schema
from dlt.common.storages.load_package import (
load_package_state,
load_package,
commit_load_package_state,
destination_state,
clear_destination_state,
Expand Down

0 comments on commit 3229745

Please sign in to comment.