Skip to content

Commit

Permalink
Merge pull request #2112 from dlt-hub/devel
Browse files Browse the repository at this point in the history
master merge for 1.4.1 release
  • Loading branch information
rudolfix authored Dec 2, 2024
2 parents 0fce1c8 + b4d807f commit f069071
Show file tree
Hide file tree
Showing 157 changed files with 5,657 additions and 2,209 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:

- name: Install dependencies
# install dlt with postgres support
run: poetry install --no-interaction -E postgres --with sentry-sdk,dbt
run: poetry install --no-interaction -E postgres -E postgis --with sentry-sdk,dbt

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
image: postgis/postgis
# Provide the password for postgres
env:
POSTGRES_DB: dlt_data
Expand Down Expand Up @@ -95,7 +95,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake

- name: Start SFTP server
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
image: postgis/postgis
# Provide the password for postgres
env:
POSTGRES_DB: dlt_data
Expand Down Expand Up @@ -83,7 +83,7 @@ jobs:

# TODO: which deps should we enable?
- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources

# run sources tests in load against configured destinations
- run: poetry run pytest tests/load/sources
Expand Down
13 changes: 6 additions & 7 deletions dlt/cli/deploy_command_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from yaml import Dumper
from itertools import chain
from typing import List, Optional, Sequence, Tuple, Any, Dict
from astunparse import unparse

# optional dependencies
import pipdeptree
Expand All @@ -23,7 +22,7 @@
from dlt.common.git import get_origin, get_repo, Repo
from dlt.common.configuration.specs.runtime_configuration import get_default_pipeline_name
from dlt.common.typing import StrAny
from dlt.common.reflection.utils import evaluate_node_literal
from dlt.common.reflection.utils import evaluate_node_literal, ast_unparse
from dlt.common.pipeline import LoadInfo, TPipelineState, get_dlt_repos_dir
from dlt.common.storages import FileStorage
from dlt.common.utils import set_working_dir
Expand Down Expand Up @@ -313,7 +312,7 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
if f_r_value is None:
fmt.warning(
"The value of `dev_mode` in call to `dlt.pipeline` cannot be"
f" determined from {unparse(f_r_node).strip()}. We assume that you know"
f" determined from {ast_unparse(f_r_node).strip()}. We assume that you know"
" what you are doing :)"
)
if f_r_value is True:
Expand All @@ -331,8 +330,8 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
raise CliCommandInnerException(
"deploy",
"The value of 'pipelines_dir' argument in call to `dlt_pipeline` cannot be"
f" determined from {unparse(p_d_node).strip()}. Pipeline working dir will"
" be found. Pass it directly with --pipelines-dir option.",
f" determined from {ast_unparse(p_d_node).strip()}. Pipeline working dir"
" will be found. Pass it directly with --pipelines-dir option.",
)

p_n_node = call_args.arguments.get("pipeline_name")
Expand All @@ -342,8 +341,8 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
raise CliCommandInnerException(
"deploy",
"The value of 'pipeline_name' argument in call to `dlt_pipeline` cannot be"
f" determined from {unparse(p_d_node).strip()}. Pipeline working dir will"
" be found. Pass it directly with --pipeline-name option.",
f" determined from {ast_unparse(p_d_node).strip()}. Pipeline working dir"
" will be found. Pass it directly with --pipeline-name option.",
)
pipelines.append((pipeline_name, pipelines_dir))

Expand Down
5 changes: 2 additions & 3 deletions dlt/cli/source_detection.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import ast
import inspect
from astunparse import unparse
from typing import Dict, Tuple, Set, List

from dlt.common.configuration import is_secret_hint
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.reflection.utils import creates_func_def_name_node
from dlt.common.reflection.utils import creates_func_def_name_node, ast_unparse
from dlt.common.typing import is_optional_type

from dlt.sources import SourceReference
Expand Down Expand Up @@ -65,7 +64,7 @@ def find_source_calls_to_replace(
for calls in visitor.known_sources_resources_calls.values():
for call in calls:
transformed_nodes.append(
(call.func, ast.Name(id=pipeline_name + "_" + unparse(call.func)))
(call.func, ast.Name(id=pipeline_name + "_" + ast_unparse(call.func)))
)

return transformed_nodes
Expand Down
39 changes: 22 additions & 17 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,32 @@
configspec,
)
from dlt import version
from dlt.common.utils import without_none

_AZURE_STORAGE_EXTRA = f"{version.DLT_PKG_NAME}[az]"


@configspec
class AzureCredentialsWithoutDefaults(CredentialsConfiguration):
class AzureCredentialsBase(CredentialsConfiguration):
azure_storage_account_name: str = None
azure_account_host: Optional[str] = None
"""Alternative host when accessing blob storage endpoint ie. my_account.dfs.core.windows.net"""

def to_adlfs_credentials(self) -> Dict[str, Any]:
pass

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/azure
creds: Dict[str, Any] = without_none(self.to_adlfs_credentials()) # type: ignore[assignment]
# only string options accepted
creds.pop("anon", None)
return creds


@configspec
class AzureCredentialsWithoutDefaults(AzureCredentialsBase):
"""Credentials for Azure Blob Storage, compatible with adlfs"""

azure_storage_account_name: str = None
azure_storage_account_key: Optional[TSecretStrValue] = None
azure_storage_sas_token: TSecretStrValue = None
azure_sas_token_permissions: str = "racwdl"
Expand All @@ -29,17 +46,9 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
account_name=self.azure_storage_account_name,
account_key=self.azure_storage_account_key,
sas_token=self.azure_storage_sas_token,
account_host=self.azure_account_host,
)

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/azure
creds = self.to_adlfs_credentials()
if creds["sas_token"] is None:
creds.pop("sas_token")
if creds["account_key"] is None:
creds.pop("account_key")
return creds

def create_sas_token(self) -> None:
try:
from azure.storage.blob import generate_account_sas, ResourceTypes
Expand All @@ -63,24 +72,20 @@ def on_partial(self) -> None:


@configspec
class AzureServicePrincipalCredentialsWithoutDefaults(CredentialsConfiguration):
azure_storage_account_name: str = None
class AzureServicePrincipalCredentialsWithoutDefaults(AzureCredentialsBase):
azure_tenant_id: str = None
azure_client_id: str = None
azure_client_secret: TSecretStrValue = None

def to_adlfs_credentials(self) -> Dict[str, Any]:
return dict(
account_name=self.azure_storage_account_name,
account_host=self.azure_account_host,
tenant_id=self.azure_tenant_id,
client_id=self.azure_client_id,
client_secret=self.azure_client_secret,
)

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/azure
return self.to_adlfs_credentials()


@configspec
class AzureCredentials(AzureCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
14 changes: 11 additions & 3 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,15 @@
try:
from dlt.common.libs.pandas import DataFrame
from dlt.common.libs.pyarrow import Table as ArrowTable
from dlt.common.libs.ibis import BaseBackend as IbisBackend
except MissingDependencyException:
DataFrame = Any
ArrowTable = Any
IbisBackend = Any
else:
DataFrame = Any
ArrowTable = Any
IbisBackend = Any


class StorageSchemaInfo(NamedTuple):
Expand Down Expand Up @@ -291,7 +294,6 @@ def _make_dataset_name(self, schema_name: str) -> str:
# if default schema is None then suffix is not added
if self.default_schema_name is not None and schema_name != self.default_schema_name:
return (self.dataset_name or "") + "_" + schema_name

return self.dataset_name


Expand Down Expand Up @@ -443,8 +445,9 @@ def run_managed(
self._finished_at = pendulum.now()
# sanity check
assert self._state in ("completed", "retry", "failed")
# wake up waiting threads
signals.wake_all()
if self._state != "retry":
# wake up waiting threads
signals.wake_all()

@abstractmethod
def run(self) -> None:
Expand Down Expand Up @@ -574,12 +577,17 @@ def close(self) -> None: ...
class SupportsReadableDataset(Protocol):
"""A readable dataset retrieved from a destination, has support for creating readable relations for a query or table"""

@property
def schema(self) -> Schema: ...

def __call__(self, query: Any) -> SupportsReadableRelation: ...

def __getitem__(self, table: str) -> SupportsReadableRelation: ...

def __getattr__(self, table: str) -> SupportsReadableRelation: ...

def ibis(self) -> IbisBackend: ...


class JobClientBase(ABC):
def __init__(
Expand Down
6 changes: 5 additions & 1 deletion dlt/common/destination/typing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Optional

from dlt.common.schema.typing import _TTableSchemaBase, TWriteDisposition, TTableReferenceParam
from dlt.common.schema.typing import (
_TTableSchemaBase,
TWriteDisposition,
TTableReferenceParam,
)


class PreparedTableSchema(_TTableSchemaBase, total=False):
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

from typing import Any, Callable, List, Literal, Optional, Sequence, TypeVar, Union

from dlt.common.schema.typing import TColumnNames
from dlt.common.typing import TSortOrder
from dlt.extract.items import TTableHintTemplate
from dlt.common.typing import TSortOrder, TTableHintTemplate, TColumnNames

TCursorValue = TypeVar("TCursorValue", bound=Any)
LastValueFunc = Callable[[Sequence[TCursorValue]], Any]
Expand All @@ -19,10 +17,12 @@ class IncrementalColumnState(TypedDict):

class IncrementalArgs(TypedDict, total=False):
cursor_path: str
initial_value: Optional[str]
last_value_func: Optional[LastValueFunc[str]]
initial_value: Optional[Any]
last_value_func: Optional[Union[LastValueFunc[str], Literal["min", "max"]]]
"""Last value callable or name of built in function"""
primary_key: Optional[TTableHintTemplate[TColumnNames]]
end_value: Optional[str]
end_value: Optional[Any]
row_order: Optional[TSortOrder]
allow_external_schedulers: Optional[bool]
lag: Optional[Union[float, int]]
on_cursor_value_missing: Optional[OnCursorValueMissing]
47 changes: 46 additions & 1 deletion dlt/common/jsonpath.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterable, Union, List, Any
from typing import Iterable, Union, List, Any, Optional, cast
from itertools import chain

from dlt.common.typing import DictStrAny
Expand Down Expand Up @@ -46,3 +46,48 @@ def resolve_paths(paths: TAnyJsonPath, data: DictStrAny) -> List[str]:
paths = compile_paths(paths)
p: JSONPath
return list(chain.from_iterable((str(r.full_path) for r in p.find(data)) for p in paths))


def is_simple_field_path(path: JSONPath) -> bool:
"""Checks if the given path represents a simple single field name.
Example:
>>> is_simple_field_path(compile_path('id'))
True
>>> is_simple_field_path(compile_path('$.id'))
False
"""
return isinstance(path, JSONPathFields) and len(path.fields) == 1 and path.fields[0] != "*"


def extract_simple_field_name(path: Union[str, JSONPath]) -> Optional[str]:
"""
Extracts a simple field name from a JSONPath if it represents a single field access.
Returns None if the path is complex (contains wildcards, array indices, or multiple fields).
Args:
path: A JSONPath object or string
Returns:
Optional[str]: The field name if path represents a simple field access, None otherwise
Example:
>>> extract_simple_field_name('name')
'name'
>>> extract_simple_field_name('"name"')
'name'
>>> extract_simple_field_name('"na$me"') # Escaped characters are preserved
'na$me'
>>> extract_simple_field_name('"na.me"') # Escaped characters are preserved
'na.me'
>>> extract_simple_field_name('$.name') # Returns None
>>> extract_simple_field_name('$.items[*].name') # Returns None
>>> extract_simple_field_name('*') # Returns None
"""
if isinstance(path, str):
path = compile_path(path)

if is_simple_field_path(path):
return cast(str, path.fields[0])

return None
Loading

0 comments on commit f069071

Please sign in to comment.