diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6d2bb6846..ff260645a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -90,7 +90,7 @@ repos: - id: text-unicode-replacement-char - repo: https://github.com/asottile/pyupgrade - rev: v3.18.0 + rev: v3.19.0 hooks: - id: pyupgrade args: [--py37-plus, --keep-runtime-typing] @@ -107,7 +107,7 @@ repos: language_version: python3 - repo: https://github.com/asottile/blacken-docs - rev: 1.19.0 + rev: 1.19.1 hooks: - id: blacken-docs additional_dependencies: diff --git a/docs/changelog/0.12.2.rst b/docs/changelog/0.12.2.rst new file mode 100644 index 000000000..d2b930f82 --- /dev/null +++ b/docs/changelog/0.12.2.rst @@ -0,0 +1,23 @@ +0.12.2 (2024-11-12) +=================== + +Improvements +------------ + +- Change Spark ``jobDescription`` for DBReader & FileDFReader from ``DBReader.run() -> Connection`` to ``Connection -> DBReader.run()``. + +Bug Fixes +--------- + +- Fix ``log_hwm`` result for ``KeyValueIntHWM`` (used by Kafka). (:github:pull:`316`) +- Fix ``log_collection`` hiding values of ``Kafka.addresses`` in logs with ``INFO`` level. (:github:pull:`316`) + +Dependencies +------------ + +- Allow using `etl-entities==2.4.0 `_. + +Doc only Changes +---------------- + +- Fix links to MSSQL date & time type documentation. diff --git a/docs/changelog/index.rst b/docs/changelog/index.rst index 812c5437f..8e0370bb9 100644 --- a/docs/changelog/index.rst +++ b/docs/changelog/index.rst @@ -3,6 +3,7 @@ :caption: Changelog DRAFT + 0.12.2 0.12.1 0.12.0 0.11.2 diff --git a/docs/changelog/next_release/+.dependency.rst b/docs/changelog/next_release/+.dependency.rst new file mode 100644 index 000000000..33ffb4eea --- /dev/null +++ b/docs/changelog/next_release/+.dependency.rst @@ -0,0 +1 @@ +Allow using `etl-entities==2.4.0 `_. diff --git a/docs/changelog/next_release/+.doc.1.rst b/docs/changelog/next_release/+.doc.1.rst new file mode 100644 index 000000000..5e1a4f3e2 --- /dev/null +++ b/docs/changelog/next_release/+.doc.1.rst @@ -0,0 +1 @@ +Fix links to MSSQL date & time type documentation. diff --git a/docs/changelog/next_release/+.improvement.1.rst b/docs/changelog/next_release/+.improvement.1.rst new file mode 100644 index 000000000..e80ec7425 --- /dev/null +++ b/docs/changelog/next_release/+.improvement.1.rst @@ -0,0 +1 @@ +Change Spark ``jobDescription`` for DBReader & FileDFReader from ``DBReader.run() -> Connection`` to ``Connection -> DBReader.run()``. diff --git a/docs/changelog/next_release/316.bugfix.1.rst b/docs/changelog/next_release/316.bugfix.1.rst new file mode 100644 index 000000000..88bad0477 --- /dev/null +++ b/docs/changelog/next_release/316.bugfix.1.rst @@ -0,0 +1 @@ +Fix ``log_hwm`` result for ``KeyValueIntHWM`` (used by Kafka). diff --git a/docs/changelog/next_release/316.bugfix.2.rst b/docs/changelog/next_release/316.bugfix.2.rst new file mode 100644 index 000000000..485a73455 --- /dev/null +++ b/docs/changelog/next_release/316.bugfix.2.rst @@ -0,0 +1 @@ +Fix ``log_collection`` hiding values of ``Kafka.addresses`` in logs with ``INFO`` level. diff --git a/docs/connection/db_connection/mssql/execute.rst b/docs/connection/db_connection/mssql/execute.rst index 85280d923..91305cd7c 100644 --- a/docs/connection/db_connection/mssql/execute.rst +++ b/docs/connection/db_connection/mssql/execute.rst @@ -72,7 +72,7 @@ This method supports **any** query syntax supported by MSSQL, like: * ✅︎ ``ALTER ...`` * ✅︎ ``INSERT INTO ... AS SELECT ...`` * ✅︎ ``DROP TABLE ...``, ``DROP VIEW ...``, and so on -* ✅︎ ``CALL procedure(arg1, arg2) ...`` or ``{call procedure(arg1, arg2)}`` - special syntax for calling procedure +* ✅︎ ``EXEC procedure(arg1, arg2) ...`` or ``{call procedure(arg1, arg2)}`` - special syntax for calling procedure * ✅︎ ``DECLARE ... BEGIN ... END`` - execute PL/SQL statement * ✅︎ other statements not mentioned here * ❌ ``SET ...; SELECT ...;`` - multiple statements not supported diff --git a/docs/connection/db_connection/mssql/types.rst b/docs/connection/db_connection/mssql/types.rst index 13c7874a9..1770c91b2 100644 --- a/docs/connection/db_connection/mssql/types.rst +++ b/docs/connection/db_connection/mssql/types.rst @@ -197,8 +197,7 @@ Temporal types So not all of values in Spark DataFrame can be written to MSSQL. References: - * `Clickhouse DateTime documentation `_ - * `Clickhouse DateTime documentation `_ + * `MSSQL date & time types documentation `_ * `Spark DateType documentation `_ * `Spark TimestampType documentation `_ @@ -213,7 +212,7 @@ Temporal types Last digit will be lost during read or write operations. .. [5] - ``time`` type is the same as ``timestamp`` with date ``1970-01-01``. So instead of reading data from MSSQL like ``23:59:59.999999`` + ``time`` type is the same as ``datetime2`` with date ``1970-01-01``. So instead of reading data from MSSQL like ``23:59:59.999999`` it is actually read ``1970-01-01 23:59:59.999999``, and vice versa. String types diff --git a/onetl/VERSION b/onetl/VERSION index 34a83616b..26acbf080 100644 --- a/onetl/VERSION +++ b/onetl/VERSION @@ -1 +1 @@ -0.12.1 +0.12.2 diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 71fa82bd3..7765936c5 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -527,13 +527,14 @@ def get_min_max_values( max_offsets[partition_id] = end_offset log.info("|%s| Received min and max offset values for each partition.", self.__class__.__name__) - for partition_id in sorted(min_offsets.keys()): + partitions = sorted(set(min_offsets.keys() | max_offsets.keys())) + for partition_id in partitions: log.debug( "|%s| Partition %d: Min Offset = %d, Max Offset = %d", self.__class__.__name__, partition_id, - min_offsets[partition_id], - max_offsets[partition_id], + min_offsets.get(partition_id), + max_offsets.get(partition_id), ) return min_offsets, max_offsets diff --git a/onetl/db/db_reader/db_reader.py b/onetl/db/db_reader/db_reader.py index dd79876a2..2729c662b 100644 --- a/onetl/db/db_reader/db_reader.py +++ b/onetl/db/db_reader/db_reader.py @@ -635,7 +635,7 @@ def run(self) -> DataFrame: self._check_strategy() - job_description = f"{self.__class__.__name__}.run({self.source}) -> {self.connection}" + job_description = f"{self.connection} -> {self.__class__.__name__}.run({self.source})" with override_job_description(self.connection.spark, job_description): if not self._connection_checked: self._log_parameters() diff --git a/onetl/file/file_df_reader/file_df_reader.py b/onetl/file/file_df_reader/file_df_reader.py index 36aab796e..8f319a386 100644 --- a/onetl/file/file_df_reader/file_df_reader.py +++ b/onetl/file/file_df_reader/file_df_reader.py @@ -212,9 +212,9 @@ def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame: self._log_parameters(files) if files: - job_description = f"{self.__class__.__name__}.run([..files..]) -> {self.connection}" + job_description = f"{self.connection} -> {self.__class__.__name__}.run([..files..])" else: - job_description = f"{self.__class__.__name__}.run({self.source_path}) -> {self.connection}" + job_description = f"{self.connection} -> {self.__class__.__name__}.run({self.source_path})" with override_job_description(self.connection.spark, job_description): paths: FileSet[PurePathProtocol] = FileSet() diff --git a/onetl/log.py b/onetl/log.py index d7fdd8e85..d9b1160f6 100644 --- a/onetl/log.py +++ b/onetl/log.py @@ -9,7 +9,7 @@ from contextlib import redirect_stdout from enum import Enum from textwrap import dedent -from typing import TYPE_CHECKING, Any, Collection, Iterable +from typing import TYPE_CHECKING, Any, Collection, Iterable, Mapping, Set from etl_entities.hwm import HWM from typing_extensions import deprecated @@ -315,6 +315,8 @@ def log_collection( log_collection(logger, "myvar", []) log_collection(logger, "myvar", ["item1", {"item2": "value2"}, None]) + log_collection(logger, "myvar", {"item1", "item2", None}) + log_collection(logger, "myvar", {"key1": "value1", "key2": None}) log_collection(logger, "myvar", ["item1", "item2", "item3"], max_items=1) log_collection( logger, @@ -334,6 +336,17 @@ def log_collection( INFO onetl.module None, INFO onetl.module ] + INFO onetl.module myvar = { + INFO onetl.module 'item1', + INFO onetl.module 'item2', + INFO onetl.module None, + INFO onetl.module } + + INFO onetl.module myvar = { + INFO onetl.module 'key1': 'value1', + INFO onetl.module 'key2': None, + INFO onetl.module } + INFO onetl.module myvar = [ INFO onetl.module 'item1', INFO onetl.module # ... 2 more items of type @@ -350,21 +363,30 @@ def log_collection( base_indent = " " * (BASE_LOG_INDENT + indent) stacklevel += 1 - items = list(collection) # force convert all iterators to list to know size - if not items: - _log(logger, "%s%s = []", base_indent, name, level=level, stacklevel=stacklevel) + + if not isinstance(collection, (Mapping, Set)): + collection = list(collection) # force convert all iterators to list to know size + + start_bracket = "[" + end_bracket = "]" + if isinstance(collection, (Mapping, Set)): + start_bracket = "{" + end_bracket = "}" + + if not collection: + _log(logger, "%s%s = %s%s", base_indent, name, start_bracket, end_bracket, level=level, stacklevel=stacklevel) return nested_indent = " " * (BASE_LOG_INDENT + indent + 4) - _log(logger, "%s%s = [", base_indent, name, level=level, stacklevel=stacklevel) + _log(logger, "%s%s = %s", base_indent, name, start_bracket, level=level, stacklevel=stacklevel) - for i, item in enumerate(items, start=1): - if max_items and i > max_items and level >= logging.DEBUG: + for i, item in enumerate(sorted(collection), start=1): + if max_items and i > max_items and level > logging.DEBUG: _log( logger, "%s# ... %d more items of type %r", nested_indent, - len(items) - max_items, + len(collection) - max_items, type(item), level=level, stacklevel=stacklevel, @@ -377,9 +399,13 @@ def log_collection( stacklevel=stacklevel, ) break - _log(logger, "%s%r,", nested_indent, item, level=level, stacklevel=stacklevel) - _log(logger, "%s]", base_indent, level=level, stacklevel=stacklevel) + if isinstance(collection, Mapping): + _log(logger, "%s%r: %r,", nested_indent, item, collection[item], level=level, stacklevel=stacklevel) + else: + _log(logger, "%s%r,", nested_indent, item, level=level, stacklevel=stacklevel) + + _log(logger, "%s%s", base_indent, end_bracket, level=level, stacklevel=stacklevel) def entity_boundary_log(logger: logging.Logger, msg: str, char: str = "=", stacklevel: int = 1) -> None: diff --git a/requirements/core.txt b/requirements/core.txt index c8e245a4a..5d915ebea 100644 --- a/requirements/core.txt +++ b/requirements/core.txt @@ -1,4 +1,4 @@ -etl-entities>=2.2,<2.4 +etl-entities>=2.2,<2.5 evacuator>=1.0,<1.1 frozendict humanize diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py index 7ae146cca..d506394d0 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py @@ -28,7 +28,6 @@ def test_kafka_strategy_incremental( ): from pyspark.sql.functions import max as spark_max - hwm_type = KeyValueIntHWM hwm_name = secrets.token_hex(5) store = HWMStoreStackManager.get_current() @@ -77,7 +76,7 @@ def test_kafka_strategy_incremental( hwm = store.get_hwm(hwm_name) assert hwm is not None - assert isinstance(hwm, hwm_type) + assert isinstance(hwm, KeyValueIntHWM) # HWM contains mapping `partition: max offset + 1` partition_offsets_initial = dict.fromkeys(range(num_partitions or 1), 0)