Skip to content

Commit

Permalink
Merge branch 'devel' into exp/ibis_expressions
Browse files Browse the repository at this point in the history
# Conflicts:
#	.github/workflows/test_destinations.yml
#	.github/workflows/test_local_destinations.yml
#	dlt/destinations/dataset.py
#	poetry.lock
#	tests/load/test_read_interfaces.py
  • Loading branch information
sh-rp committed Dec 6, 2024
2 parents 7af8870 + 31fa78c commit 3bbad35
Show file tree
Hide file tree
Showing 112 changed files with 3,611 additions and 1,925 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:

- name: Install dependencies
# install dlt with postgres support
run: poetry install --no-interaction -E postgres --with sentry-sdk,dbt
run: poetry install --no-interaction -E postgres -E postgis --with sentry-sdk,dbt

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline,ibis -E deltalake
run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline,ibis -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
image: postgis/postgis
# Provide the password for postgres
env:
POSTGRES_DB: dlt_data
Expand Down Expand Up @@ -95,7 +95,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline,ibis -E deltalake
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline,ibis -E deltalake

- name: Start SFTP server
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
image: postgis/postgis
# Provide the password for postgres
env:
POSTGRES_DB: dlt_data
Expand Down Expand Up @@ -83,7 +83,7 @@ jobs:

# TODO: which deps should we enable?
- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources

# run sources tests in load against configured destinations
- run: poetry run pytest tests/load/sources
Expand Down
13 changes: 6 additions & 7 deletions dlt/cli/deploy_command_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from yaml import Dumper
from itertools import chain
from typing import List, Optional, Sequence, Tuple, Any, Dict
from astunparse import unparse

# optional dependencies
import pipdeptree
Expand All @@ -23,7 +22,7 @@
from dlt.common.git import get_origin, get_repo, Repo
from dlt.common.configuration.specs.runtime_configuration import get_default_pipeline_name
from dlt.common.typing import StrAny
from dlt.common.reflection.utils import evaluate_node_literal
from dlt.common.reflection.utils import evaluate_node_literal, ast_unparse
from dlt.common.pipeline import LoadInfo, TPipelineState, get_dlt_repos_dir
from dlt.common.storages import FileStorage
from dlt.common.utils import set_working_dir
Expand Down Expand Up @@ -313,7 +312,7 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
if f_r_value is None:
fmt.warning(
"The value of `dev_mode` in call to `dlt.pipeline` cannot be"
f" determined from {unparse(f_r_node).strip()}. We assume that you know"
f" determined from {ast_unparse(f_r_node).strip()}. We assume that you know"
" what you are doing :)"
)
if f_r_value is True:
Expand All @@ -331,8 +330,8 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
raise CliCommandInnerException(
"deploy",
"The value of 'pipelines_dir' argument in call to `dlt_pipeline` cannot be"
f" determined from {unparse(p_d_node).strip()}. Pipeline working dir will"
" be found. Pass it directly with --pipelines-dir option.",
f" determined from {ast_unparse(p_d_node).strip()}. Pipeline working dir"
" will be found. Pass it directly with --pipelines-dir option.",
)

p_n_node = call_args.arguments.get("pipeline_name")
Expand All @@ -342,8 +341,8 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
raise CliCommandInnerException(
"deploy",
"The value of 'pipeline_name' argument in call to `dlt_pipeline` cannot be"
f" determined from {unparse(p_d_node).strip()}. Pipeline working dir will"
" be found. Pass it directly with --pipeline-name option.",
f" determined from {ast_unparse(p_d_node).strip()}. Pipeline working dir"
" will be found. Pass it directly with --pipeline-name option.",
)
pipelines.append((pipeline_name, pipelines_dir))

Expand Down
5 changes: 2 additions & 3 deletions dlt/cli/source_detection.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import ast
import inspect
from astunparse import unparse
from typing import Dict, Tuple, Set, List

from dlt.common.configuration import is_secret_hint
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.reflection.utils import creates_func_def_name_node
from dlt.common.reflection.utils import creates_func_def_name_node, ast_unparse
from dlt.common.typing import is_optional_type

from dlt.sources import SourceReference
Expand Down Expand Up @@ -65,7 +64,7 @@ def find_source_calls_to_replace(
for calls in visitor.known_sources_resources_calls.values():
for call in calls:
transformed_nodes.append(
(call.func, ast.Name(id=pipeline_name + "_" + unparse(call.func)))
(call.func, ast.Name(id=pipeline_name + "_" + ast_unparse(call.func)))
)

return transformed_nodes
Expand Down
1 change: 0 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
DataFrame = Any
ArrowTable = Any
IbisBackend = Any

else:
DataFrame = Any
ArrowTable = Any
Expand Down
6 changes: 5 additions & 1 deletion dlt/common/destination/typing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Optional

from dlt.common.schema.typing import _TTableSchemaBase, TWriteDisposition, TTableReferenceParam
from dlt.common.schema.typing import (
_TTableSchemaBase,
TWriteDisposition,
TTableReferenceParam,
)


class PreparedTableSchema(_TTableSchemaBase, total=False):
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

from typing import Any, Callable, List, Literal, Optional, Sequence, TypeVar, Union

from dlt.common.schema.typing import TColumnNames
from dlt.common.typing import TSortOrder
from dlt.extract.items import TTableHintTemplate
from dlt.common.typing import TSortOrder, TTableHintTemplate, TColumnNames

TCursorValue = TypeVar("TCursorValue", bound=Any)
LastValueFunc = Callable[[Sequence[TCursorValue]], Any]
Expand All @@ -19,10 +17,12 @@ class IncrementalColumnState(TypedDict):

class IncrementalArgs(TypedDict, total=False):
cursor_path: str
initial_value: Optional[str]
last_value_func: Optional[LastValueFunc[str]]
initial_value: Optional[Any]
last_value_func: Optional[Union[LastValueFunc[str], Literal["min", "max"]]]
"""Last value callable or name of built in function"""
primary_key: Optional[TTableHintTemplate[TColumnNames]]
end_value: Optional[str]
end_value: Optional[Any]
row_order: Optional[TSortOrder]
allow_external_schedulers: Optional[bool]
lag: Optional[Union[float, int]]
on_cursor_value_missing: Optional[OnCursorValueMissing]
5 changes: 3 additions & 2 deletions dlt/common/libs/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
raise MissingDependencyException("dlt Pandas Helpers", ["pandas"])


def pandas_to_arrow(df: pandas.DataFrame) -> Any:
def pandas_to_arrow(df: pandas.DataFrame, preserve_index: bool = False) -> Any:
"""Converts pandas to arrow or raises an exception if pyarrow is not installed"""
from dlt.common.libs.pyarrow import pyarrow as pa

return pa.Table.from_pandas(df)
# NOTE: None preserves named indexes but ignores unnamed
return pa.Table.from_pandas(df, preserve_index=preserve_index)
11 changes: 9 additions & 2 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,15 +620,22 @@ def row_tuples_to_arrow(
)
float_array = pa.array(columnar_known_types[field.name], type=pa.float64())
columnar_known_types[field.name] = float_array.cast(field.type, safe=False)
if issubclass(py_type, (dict, list)):
if issubclass(py_type, (dict, list, set)):
logger.warning(
f"Field {field.name} was reflected as JSON type and needs to be serialized back to"
" string to be placed in arrow table. This will slow data extraction down. You"
" should cast JSON field to STRING in your database system ie. by creating and"
" extracting an SQL VIEW that selects with cast."
)
json_str_array = pa.array(
[None if s is None else json.dumps(s) for s in columnar_known_types[field.name]]
[
(
None
if s is None
else json.dumps(s) if not issubclass(type(s), set) else json.dumps(list(s))
)
for s in columnar_known_types[field.name]
]
)
columnar_known_types[field.name] = json_str_array

Expand Down
141 changes: 141 additions & 0 deletions dlt/common/normalizers/json/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""
Cached helper methods for all operations that are called often
"""
from functools import lru_cache
from typing import Any, Dict, List, Optional, Tuple, cast

from dlt.common.json import json
from dlt.common.destination.utils import resolve_merge_strategy
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.normalizers.typing import TRowIdType
from dlt.common.normalizers.utils import DLT_ID_LENGTH_BYTES
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnSchema, C_DLT_ID, DLT_NAME_PREFIX
from dlt.common.schema.utils import (
get_columns_names_with_prop,
get_first_column_name_with_prop,
is_nested_table,
)
from dlt.common.utils import digest128


@lru_cache(maxsize=None)
def shorten_fragments(naming: NamingConvention, *idents: str) -> str:
return naming.shorten_fragments(*idents)


@lru_cache(maxsize=None)
def normalize_table_identifier(schema: Schema, naming: NamingConvention, table_name: str) -> str:
if schema._normalizers_config.get("use_break_path_on_normalize", True):
return naming.normalize_tables_path(table_name)
else:
return naming.normalize_table_identifier(table_name)


@lru_cache(maxsize=None)
def normalize_identifier(schema: Schema, naming: NamingConvention, identifier: str) -> str:
if schema._normalizers_config.get("use_break_path_on_normalize", True):
return naming.normalize_path(identifier)
else:
return naming.normalize_identifier(identifier)


@lru_cache(maxsize=None)
def get_table_nesting_level(
schema: Schema, table_name: str, default_nesting: int = 1000
) -> Optional[int]:
"""gets table nesting level, will inherit from parent if not set"""

table = schema.tables.get(table_name)
if (
table
and (max_nesting := cast(int, table.get("x-normalizer", {}).get("max_nesting"))) is not None
):
return max_nesting
return default_nesting


@lru_cache(maxsize=None)
def get_primary_key(schema: Schema, table_name: str) -> List[str]:
if table_name not in schema.tables:
return []
table = schema.get_table(table_name)
return get_columns_names_with_prop(table, "primary_key", include_incomplete=True)


@lru_cache(maxsize=None)
def is_nested_type(
schema: Schema,
table_name: str,
field_name: str,
_r_lvl: int,
) -> bool:
"""For those paths the nested objects should be left in place.
Cache perf: max_nesting < _r_lvl: ~2x faster, full check 10x faster
"""

# nesting level is counted backwards
# is we have traversed to or beyond the calculated nesting level, we detect a nested type
if _r_lvl <= 0:
return True

column: TColumnSchema = None
table = schema.tables.get(table_name)
if table:
column = table["columns"].get(field_name)
if column is None or "data_type" not in column:
data_type = schema.get_preferred_type(field_name)
else:
data_type = column["data_type"]

return data_type == "json"


@lru_cache(maxsize=None)
def get_nested_row_id_type(schema: Schema, table_name: str) -> Tuple[TRowIdType, bool]:
"""Gets type of row id to be added to nested table and if linking information should be added"""
if table := schema.tables.get(table_name):
merge_strategy = resolve_merge_strategy(schema.tables, table)
if merge_strategy not in ("upsert", "scd2") and not is_nested_table(table):
return "random", False
else:
# table will be created, use standard linking
pass
return "row_hash", True


@lru_cache(maxsize=None)
def get_root_row_id_type(schema: Schema, table_name: str) -> TRowIdType:
if table := schema.tables.get(table_name):
merge_strategy = resolve_merge_strategy(schema.tables, table)
if merge_strategy == "upsert":
return "key_hash"
elif merge_strategy == "scd2":
x_row_version_col = get_first_column_name_with_prop(
schema.get_table(table_name),
"x-row-version",
include_incomplete=True,
)
if x_row_version_col == schema.naming.normalize_identifier(C_DLT_ID):
return "row_hash"
return "random"


def get_row_hash(row: Dict[str, Any], subset: Optional[List[str]] = None) -> str:
"""Returns hash of row.
Hash includes column names and values and is ordered by column name.
Excludes dlt system columns.
Can be used as deterministic row identifier.
"""
row_filtered = {k: v for k, v in row.items() if not k.startswith(DLT_NAME_PREFIX)}
if subset is not None:
row_filtered = {k: v for k, v in row.items() if k in subset}
row_str = json.dumps(row_filtered, sort_keys=True)
return digest128(row_str, DLT_ID_LENGTH_BYTES)


def get_nested_row_hash(parent_row_id: str, nested_table: str, list_idx: int) -> str:
# create deterministic unique id of the nested row taking into account that all lists are ordered
# and all nested tables must be lists
return digest128(f"{parent_row_id}_{nested_table}_{list_idx}", DLT_ID_LENGTH_BYTES)
Loading

0 comments on commit 3bbad35

Please sign in to comment.