diff --git a/Makefile b/Makefile index bd425b0e42..19751f54b2 100644 --- a/Makefile +++ b/Makefile @@ -49,9 +49,10 @@ dev: has-poetry lint: ./check-package.sh poetry run mypy --config-file mypy.ini dlt tests - poetry run flake8 --max-line-length=200 dlt - poetry run flake8 --max-line-length=200 tests --exclude tests/reflection/module_cases - poetry run black dlt docs tests --diff --extend-exclude=".*syntax_error.py" + poetry run ruff lint dlt tests + # poetry run flake8 --max-line-length=200 dlt + # poetry run flake8 --max-line-length=200 tests --exclude tests/reflection/module_cases + # poetry run black dlt docs tests --diff --extend-exclude=".*syntax_error.py" # poetry run isort ./ --diff # $(MAKE) lint-security diff --git a/dlt/cli/deploy_command.py b/dlt/cli/deploy_command.py index 5a25752a6d..1a8c297975 100644 --- a/dlt/cli/deploy_command.py +++ b/dlt/cli/deploy_command.py @@ -134,9 +134,9 @@ def _create_new_workflow(self) -> Any: ) as f: workflow = yaml.safe_load(f) # customize the workflow - workflow["name"] = ( - f"Run {self.state['pipeline_name']} pipeline from {self.pipeline_script_path}" - ) + workflow[ + "name" + ] = f"Run {self.state['pipeline_name']} pipeline from {self.pipeline_script_path}" if self.run_on_push is False: del workflow["on"]["push"] if self.run_manually is False: @@ -196,8 +196,7 @@ def _echo_instructions(self, *args: Optional[Any]) -> None: fmt.echo( "* The dependencies that will be used to run the pipeline are stored in %s. If you" " change add more dependencies, remember to refresh your deployment by running the same" - " 'deploy' command again." - % fmt.bold(self.artifacts["requirements_txt_name"]) + " 'deploy' command again." % fmt.bold(self.artifacts["requirements_txt_name"]) ) fmt.echo() if len(self.secret_envs) == 0 and len(self.envs) == 0: diff --git a/dlt/common/configuration/inject.py b/dlt/common/configuration/inject.py index a22f299ae8..a3b7186dd1 100644 --- a/dlt/common/configuration/inject.py +++ b/dlt/common/configuration/inject.py @@ -32,7 +32,8 @@ def with_config( auto_pipeline_section: bool = False, include_defaults: bool = True, accept_partial: bool = False, -) -> TFun: ... +) -> TFun: + ... @overload @@ -45,7 +46,8 @@ def with_config( auto_pipeline_section: bool = False, include_defaults: bool = True, accept_partial: bool = False, -) -> Callable[[TFun], TFun]: ... +) -> Callable[[TFun], TFun]: + ... def with_config( diff --git a/dlt/common/configuration/resolve.py b/dlt/common/configuration/resolve.py index db69cd9572..24fb5e03c2 100644 --- a/dlt/common/configuration/resolve.py +++ b/dlt/common/configuration/resolve.py @@ -44,7 +44,7 @@ def resolve_configuration( *, sections: Tuple[str, ...] = (), explicit_value: Any = None, - accept_partial: bool = False + accept_partial: bool = False, ) -> TConfiguration: if not isinstance(config, BaseConfiguration): raise ConfigurationWrongTypeException(type(config)) diff --git a/dlt/common/configuration/specs/base_configuration.py b/dlt/common/configuration/specs/base_configuration.py index 84f59fa894..d0d3699cce 100644 --- a/dlt/common/configuration/specs/base_configuration.py +++ b/dlt/common/configuration/specs/base_configuration.py @@ -105,11 +105,13 @@ def is_secret_hint(hint: Type[Any]) -> bool: @overload -def configspec(cls: Type[TAnyClass]) -> Type[TAnyClass]: ... +def configspec(cls: Type[TAnyClass]) -> Type[TAnyClass]: + ... @overload -def configspec(cls: None = ...) -> Callable[[Type[TAnyClass]], Type[TAnyClass]]: ... +def configspec(cls: None = ...) -> Callable[[Type[TAnyClass]], Type[TAnyClass]]: + ... def configspec( diff --git a/dlt/common/configuration/specs/config_section_context.py b/dlt/common/configuration/specs/config_section_context.py index a656a2b0fe..f211947ded 100644 --- a/dlt/common/configuration/specs/config_section_context.py +++ b/dlt/common/configuration/specs/config_section_context.py @@ -79,4 +79,5 @@ def __init__( sections: Tuple[str, ...] = (), merge_style: TMergeFunc = None, source_state_key: str = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/common/data_writers/buffered.py b/dlt/common/data_writers/buffered.py index 24935d73ac..5a5e82953d 100644 --- a/dlt/common/data_writers/buffered.py +++ b/dlt/common/data_writers/buffered.py @@ -45,7 +45,7 @@ def __init__( file_max_items: int = None, file_max_bytes: int = None, disable_compression: bool = False, - _caps: DestinationCapabilitiesContext = None + _caps: DestinationCapabilitiesContext = None, ): self.file_format = file_format self._file_format_spec = DataWriter.data_format_from_file_format(self.file_format) @@ -197,7 +197,9 @@ def _flush_items(self, allow_empty_file: bool = False) -> None: self._file = self.open(self._file_name, "wb") # type: ignore else: self._file = self.open(self._file_name, "wt", encoding="utf-8") # type: ignore - self._writer = DataWriter.from_file_format(self.file_format, self._file, caps=self._caps) # type: ignore[assignment] + self._writer = DataWriter.from_file_format( + self.file_format, self._file, caps=self._caps + ) # type: ignore[assignment] self._writer.write_header(self._current_columns) # write buffer if self._buffered_items: diff --git a/dlt/common/data_writers/escape.py b/dlt/common/data_writers/escape.py index 5bf8f29ccb..139f2086a0 100644 --- a/dlt/common/data_writers/escape.py +++ b/dlt/common/data_writers/escape.py @@ -20,7 +20,10 @@ def _make_sql_escape_re(escape_dict: Dict[str, str]) -> re.Pattern: # type: ign def _escape_extended( - v: str, prefix: str = "E'", escape_dict: Dict[str, str] = None, escape_re: re.Pattern = None # type: ignore[type-arg] + v: str, + prefix: str = "E'", + escape_dict: Dict[str, str] = None, + escape_re: re.Pattern = None, # type: ignore[type-arg] ) -> str: escape_dict = escape_dict or SQL_ESCAPE_DICT escape_re = escape_re or SQL_ESCAPE_RE diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 1c28dffa8c..7dce6a4023 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -74,9 +74,9 @@ class StateInfo(NamedTuple): class DestinationClientConfiguration(BaseConfiguration): destination_type: Final[str] = None # which destination to load data to credentials: Optional[CredentialsConfiguration] - destination_name: Optional[str] = ( - None # name of the destination, if not set, destination_type is used - ) + destination_name: Optional[ + str + ] = None # name of the destination, if not set, destination_type is used environment: Optional[str] = None def fingerprint(self) -> str: @@ -98,7 +98,8 @@ def __init__( credentials: Optional[CredentialsConfiguration] = None, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... @configspec @@ -143,7 +144,8 @@ def __init__( default_schema_name: Optional[str] = None, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... @configspec @@ -171,7 +173,8 @@ def __init__( layout: str = None, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... @configspec @@ -191,7 +194,8 @@ def __init__( staging_config: Optional[DestinationClientStagingConfiguration] = None, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... TLoadJobState = Literal["running", "failed", "retry", "completed"] diff --git a/dlt/common/json/__init__.py b/dlt/common/json/__init__.py index e9b52cc382..b56ee23a90 100644 --- a/dlt/common/json/__init__.py +++ b/dlt/common/json/__init__.py @@ -24,29 +24,38 @@ class SupportsJson(Protocol): _impl_name: str """Implementation name""" - def dump( - self, obj: Any, fp: IO[bytes], sort_keys: bool = False, pretty: bool = False - ) -> None: ... + def dump(self, obj: Any, fp: IO[bytes], sort_keys: bool = False, pretty: bool = False) -> None: + ... - def typed_dump(self, obj: Any, fp: IO[bytes], pretty: bool = False) -> None: ... + def typed_dump(self, obj: Any, fp: IO[bytes], pretty: bool = False) -> None: + ... - def typed_dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: ... + def typed_dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: + ... - def typed_loads(self, s: str) -> Any: ... + def typed_loads(self, s: str) -> Any: + ... - def typed_dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: ... + def typed_dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: + ... - def typed_loadb(self, s: Union[bytes, bytearray, memoryview]) -> Any: ... + def typed_loadb(self, s: Union[bytes, bytearray, memoryview]) -> Any: + ... - def dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: ... + def dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: + ... - def dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: ... + def dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: + ... - def load(self, fp: Union[IO[bytes], IO[str]]) -> Any: ... + def load(self, fp: Union[IO[bytes], IO[str]]) -> Any: + ... - def loads(self, s: str) -> Any: ... + def loads(self, s: str) -> Any: + ... - def loadb(self, s: Union[bytes, bytearray, memoryview]) -> Any: ... + def loadb(self, s: Union[bytes, bytearray, memoryview]) -> Any: + ... def custom_encode(obj: Any) -> str: diff --git a/dlt/common/libs/pydantic.py b/dlt/common/libs/pydantic.py index 58829f0592..827f6bf8e3 100644 --- a/dlt/common/libs/pydantic.py +++ b/dlt/common/libs/pydantic.py @@ -71,7 +71,7 @@ class DltConfig(TypedDict, total=False): def pydantic_to_table_schema_columns( - model: Union[BaseModel, Type[BaseModel]] + model: Union[BaseModel, Type[BaseModel]], ) -> TTableSchemaColumns: """Convert a pydantic model to a table schema columns dict @@ -261,7 +261,8 @@ def create_list_model( # TODO: use LenientList to create list model that automatically discards invalid items # https://github.com/pydantic/pydantic/issues/2274 and https://gist.github.com/dmontagu/7f0cef76e5e0e04198dd608ad7219573 return create_model( - "List" + __name__, items=(List[model], ...) # type: ignore[return-value,valid-type] + "List" + __name__, + items=(List[model], ...), # type: ignore[return-value,valid-type] ) diff --git a/dlt/common/normalizers/configuration.py b/dlt/common/normalizers/configuration.py index 6957417f9d..d26a5da7c6 100644 --- a/dlt/common/normalizers/configuration.py +++ b/dlt/common/normalizers/configuration.py @@ -25,4 +25,5 @@ def on_resolved(self) -> None: if TYPE_CHECKING: - def __init__(self, naming: str = None, json_normalizer: TJSONNormalizer = None) -> None: ... + def __init__(self, naming: str = None, json_normalizer: TJSONNormalizer = None) -> None: + ... diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 6b7b308b44..0ca0c6f511 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -524,7 +524,8 @@ def run( schema: Schema = None, loader_file_format: TLoaderFileFormat = None, schema_contract: TSchemaContract = None, - ) -> LoadInfo: ... + ) -> LoadInfo: + ... def _set_context(self, is_active: bool) -> None: """Called when pipeline context activated or deactivate""" @@ -546,7 +547,8 @@ def __call__( schema: Schema = None, loader_file_format: TLoaderFileFormat = None, schema_contract: TSchemaContract = None, - ) -> LoadInfo: ... + ) -> LoadInfo: + ... @configspec @@ -596,7 +598,8 @@ class StateInjectableContext(ContainerInjectableContext): if TYPE_CHECKING: - def __init__(self, state: TPipelineState = None) -> None: ... + def __init__(self, state: TPipelineState = None) -> None: + ... def pipeline_state( diff --git a/dlt/common/runners/configuration.py b/dlt/common/runners/configuration.py index c5de2353f4..aae934e61e 100644 --- a/dlt/common/runners/configuration.py +++ b/dlt/common/runners/configuration.py @@ -25,4 +25,5 @@ def __init__( start_method: str = None, workers: int = None, run_sleep: float = 0.1, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/common/runtime/logger.py b/dlt/common/runtime/logger.py index 9dd8ce4e3a..42cce9c912 100644 --- a/dlt/common/runtime/logger.py +++ b/dlt/common/runtime/logger.py @@ -14,7 +14,8 @@ class LogMethod(Protocol): - def __call__(self, msg: str, *args: Any, **kwds: Any) -> None: ... + def __call__(self, msg: str, *args: Any, **kwds: Any) -> None: + ... def __getattr__(name: str) -> LogMethod: diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index e95699b91e..25a82c8964 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -889,7 +889,9 @@ def _compile_settings(self) -> None: map(utils.compile_simple_regex, table["filters"]["includes"]) ) # look for auto-detections in settings and then normalizer - self._type_detections = self._settings.get("detections") or self._normalizers_config.get("detections") or [] # type: ignore + self._type_detections = ( + self._settings.get("detections") or self._normalizers_config.get("detections") or [] + ) # type: ignore def __repr__(self) -> str: return f"Schema {self.name} at {id(self)}" diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py index 2279698dda..fc63df85b8 100644 --- a/dlt/common/storages/configuration.py +++ b/dlt/common/storages/configuration.py @@ -38,7 +38,8 @@ def __init__( schema_volume_path: str = None, import_schema_path: str = None, export_schema_path: str = None, - ) -> None: ... + ) -> None: + ... @configspec @@ -47,7 +48,8 @@ class NormalizeStorageConfiguration(BaseConfiguration): if TYPE_CHECKING: - def __init__(self, normalize_volume_path: str = None) -> None: ... + def __init__(self, normalize_volume_path: str = None) -> None: + ... @configspec @@ -63,7 +65,8 @@ class LoadStorageConfiguration(BaseConfiguration): def __init__( self, load_volume_path: str = None, delete_completed_jobs: bool = None - ) -> None: ... + ) -> None: + ... FileSystemCredentials = Union[ diff --git a/dlt/common/storages/load_package.py b/dlt/common/storages/load_package.py index 2860364cd0..ee00bdf69c 100644 --- a/dlt/common/storages/load_package.py +++ b/dlt/common/storages/load_package.py @@ -177,7 +177,9 @@ def asstr(self, verbosity: int = 0) -> str: f" {completed_msg}.\n" ) msg += "Jobs details:\n" - msg += "\n".join(job.asstr(verbosity) for job in flatten_list_or_items(iter(self.jobs.values()))) # type: ignore + msg += "\n".join( + job.asstr(verbosity) for job in flatten_list_or_items(iter(self.jobs.values())) + ) # type: ignore return msg def __str__(self) -> str: diff --git a/dlt/common/storages/normalize_storage.py b/dlt/common/storages/normalize_storage.py index 8a247c2021..cd909eeb6b 100644 --- a/dlt/common/storages/normalize_storage.py +++ b/dlt/common/storages/normalize_storage.py @@ -17,9 +17,9 @@ class NormalizeStorage(VersionedStorage): STORAGE_VERSION: ClassVar[str] = "1.0.1" - EXTRACTED_FOLDER: ClassVar[str] = ( - "extracted" # folder within the volume where extracted files to be normalized are stored - ) + EXTRACTED_FOLDER: ClassVar[ + str + ] = "extracted" # folder within the volume where extracted files to be normalized are stored @with_config(spec=NormalizeStorageConfiguration, sections=(known_sections.NORMALIZE,)) def __init__( diff --git a/dlt/common/time.py b/dlt/common/time.py index ed390c28bf..0e8768bcc4 100644 --- a/dlt/common/time.py +++ b/dlt/common/time.py @@ -139,7 +139,7 @@ def ensure_pendulum_time(value: Union[str, datetime.time]) -> pendulum.Time: def _datetime_from_ts_or_iso( - value: Union[int, float, str] + value: Union[int, float, str], ) -> Union[pendulum.DateTime, pendulum.Date, pendulum.Time]: if isinstance(value, (int, float)): return pendulum.from_timestamp(value) @@ -172,4 +172,8 @@ def to_seconds(td: Optional[TimedeltaSeconds]) -> Optional[float]: def reduce_pendulum_datetime_precision(value: T, microsecond_precision: int) -> T: if microsecond_precision >= 6: return value - return value.replace(microsecond=value.microsecond // 10 ** (6 - microsecond_precision) * 10 ** (6 - microsecond_precision)) # type: ignore + return value.replace( + microsecond=value.microsecond + // 10 ** (6 - microsecond_precision) + * 10 ** (6 - microsecond_precision) + ) # type: ignore diff --git a/dlt/common/typing.py b/dlt/common/typing.py index b6a27a98a7..18a6827581 100644 --- a/dlt/common/typing.py +++ b/dlt/common/typing.py @@ -79,7 +79,8 @@ class SupportsVariant(Protocol, Generic[TVariantBase]): See `Wei` type declaration which returns Decimal or str for values greater than supported by destination warehouse. """ - def __call__(self) -> Union[TVariantBase, TVariantRV]: ... + def __call__(self) -> Union[TVariantBase, TVariantRV]: + ... class SupportsHumanize(Protocol): @@ -232,7 +233,7 @@ def get_generic_type_argument_from_instance( def copy_sig( - wrapper: Callable[TInputArgs, Any] + wrapper: Callable[TInputArgs, Any], ) -> Callable[[Callable[..., TReturnVal]], Callable[TInputArgs, TReturnVal]]: """Copies docstring and signature from wrapper to func but keeps the func return value type""" diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 4837f0dbdf..227ea4634e 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -351,9 +351,7 @@ def _from_db_type( return self.type_mapper.from_db_type(hive_t, precision, scale) def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: - return ( - f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}" - ) + return f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}" def _get_table_update_sql( self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool @@ -378,15 +376,19 @@ def _get_table_update_sql( # use qualified table names qualified_table_name = self.sql_client.make_qualified_ddl_table_name(table_name) if is_iceberg and not generate_alter: - sql.append(f"""CREATE TABLE {qualified_table_name} + sql.append( + f"""CREATE TABLE {qualified_table_name} ({columns}) LOCATION '{location}' - TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""") + TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""" + ) elif not generate_alter: - sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name} + sql.append( + f"""CREATE EXTERNAL TABLE {qualified_table_name} ({columns}) STORED AS PARQUET - LOCATION '{location}';""") + LOCATION '{location}';""" + ) # alter table to add new columns at the end else: sql.append(f"""ALTER TABLE {qualified_table_name} ADD COLUMNS ({columns});""") diff --git a/dlt/destinations/impl/athena/configuration.py b/dlt/destinations/impl/athena/configuration.py index 6b985f284a..2d805d6a87 100644 --- a/dlt/destinations/impl/athena/configuration.py +++ b/dlt/destinations/impl/athena/configuration.py @@ -38,4 +38,5 @@ def __init__( force_iceberg: Optional[bool] = False, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index fa4f5f0419..4a0eb7b71b 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -295,9 +295,7 @@ def _get_table_update_sql( def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: name = self.capabilities.escape_identifier(c["name"]) - return ( - f"{name} {self.type_mapper.to_db_type(c, table_format)} {self._gen_not_null(c.get('nullable', True))}" - ) + return f"{name} {self.type_mapper.to_db_type(c, table_format)} {self._gen_not_null(c.get('nullable', True))}" def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]: schema_table: TTableSchemaColumns = {} diff --git a/dlt/destinations/impl/bigquery/configuration.py b/dlt/destinations/impl/bigquery/configuration.py index bf41d38aff..39c19a5f98 100644 --- a/dlt/destinations/impl/bigquery/configuration.py +++ b/dlt/destinations/impl/bigquery/configuration.py @@ -53,4 +53,5 @@ def __init__( retry_deadline: float = 60.0, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/destinations/impl/duckdb/configuration.py b/dlt/destinations/impl/duckdb/configuration.py index 8cb88c43b5..6a66135511 100644 --- a/dlt/destinations/impl/duckdb/configuration.py +++ b/dlt/destinations/impl/duckdb/configuration.py @@ -219,4 +219,5 @@ def __init__( staging_config: Optional[DestinationClientStagingConfiguration] = None, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/destinations/impl/duckdb/duck.py b/dlt/destinations/impl/duckdb/duck.py index 735a4ce7e3..5ed502886c 100644 --- a/dlt/destinations/impl/duckdb/duck.py +++ b/dlt/destinations/impl/duckdb/duck.py @@ -169,9 +169,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non if c.get(h, False) is True ) column_name = self.capabilities.escape_identifier(c["name"]) - return ( - f"{column_name} {self.type_mapper.to_db_type(c)} {hints_str} {self._gen_not_null(c.get('nullable', True))}" - ) + return f"{column_name} {self.type_mapper.to_db_type(c)} {hints_str} {self._gen_not_null(c.get('nullable', True))}" def _from_db_type( self, pq_t: str, precision: Optional[int], scale: Optional[int] diff --git a/dlt/destinations/impl/dummy/configuration.py b/dlt/destinations/impl/dummy/configuration.py index 82dc797126..94f1e8417c 100644 --- a/dlt/destinations/impl/dummy/configuration.py +++ b/dlt/destinations/impl/dummy/configuration.py @@ -45,4 +45,5 @@ def __init__( fail_in_init: bool = None, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/destinations/impl/filesystem/configuration.py b/dlt/destinations/impl/filesystem/configuration.py index 93e5537aab..af0c53e048 100644 --- a/dlt/destinations/impl/filesystem/configuration.py +++ b/dlt/destinations/impl/filesystem/configuration.py @@ -11,7 +11,9 @@ @configspec -class FilesystemDestinationClientConfiguration(FilesystemConfiguration, DestinationClientStagingConfiguration): # type: ignore[misc] +class FilesystemDestinationClientConfiguration( + FilesystemConfiguration, DestinationClientStagingConfiguration +): # type: ignore[misc] destination_type: Final[str] = "filesystem" # type: ignore @resolve_type("credentials") @@ -30,4 +32,5 @@ def __init__( bucket_url: str = None, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 5885f8a1ec..058c8f8aac 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -65,9 +65,7 @@ def make_destination_filename( ) def make_remote_path(self) -> str: - return ( - f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}" - ) + return f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}" def state(self) -> TLoadJobState: return "completed" diff --git a/dlt/destinations/impl/motherduck/configuration.py b/dlt/destinations/impl/motherduck/configuration.py index f4ab571e5c..f719003ce8 100644 --- a/dlt/destinations/impl/motherduck/configuration.py +++ b/dlt/destinations/impl/motherduck/configuration.py @@ -81,7 +81,8 @@ def __init__( create_indexes: Optional[bool] = None, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... class MotherduckLocalVersionNotSupported(DestinationTerminalException): diff --git a/dlt/destinations/impl/mssql/configuration.py b/dlt/destinations/impl/mssql/configuration.py index f33aca4b82..99729298cc 100644 --- a/dlt/destinations/impl/mssql/configuration.py +++ b/dlt/destinations/impl/mssql/configuration.py @@ -105,4 +105,5 @@ def __init__( create_indexes: Optional[bool] = None, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/destinations/impl/postgres/configuration.py b/dlt/destinations/impl/postgres/configuration.py index 1433d9f641..cfffc0edaf 100644 --- a/dlt/destinations/impl/postgres/configuration.py +++ b/dlt/destinations/impl/postgres/configuration.py @@ -58,4 +58,5 @@ def __init__( create_indexes: bool = True, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/destinations/impl/postgres/postgres.py b/dlt/destinations/impl/postgres/postgres.py index f8fa3e341a..a82b90b3bb 100644 --- a/dlt/destinations/impl/postgres/postgres.py +++ b/dlt/destinations/impl/postgres/postgres.py @@ -122,9 +122,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non if c.get(h, False) is True ) column_name = self.capabilities.escape_identifier(c["name"]) - return ( - f"{column_name} {self.type_mapper.to_db_type(c)} {hints_str} {self._gen_not_null(c.get('nullable', True))}" - ) + return f"{column_name} {self.type_mapper.to_db_type(c)} {hints_str} {self._gen_not_null(c.get('nullable', True))}" def _create_replace_followup_jobs( self, table_chain: Sequence[TTableSchema] diff --git a/dlt/destinations/impl/redshift/configuration.py b/dlt/destinations/impl/redshift/configuration.py index 2a6ade4a4f..e5d85dec3e 100644 --- a/dlt/destinations/impl/redshift/configuration.py +++ b/dlt/destinations/impl/redshift/configuration.py @@ -42,4 +42,5 @@ def __init__( staging_iam_role: str = None, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index eaa1968133..ad97805f59 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -188,13 +188,15 @@ def execute(self, table: TTableSchema, bucket_path: str) -> None: with self._sql_client.begin_transaction(): dataset_name = self._sql_client.dataset_name # TODO: if we ever support csv here remember to add column names to COPY - self._sql_client.execute_sql(f""" + self._sql_client.execute_sql( + f""" COPY {dataset_name}.{table_name} FROM '{bucket_path}' {file_type} {dateformat} {compression} - {credentials} MAXERROR 0;""") + {credentials} MAXERROR 0;""" + ) def exception(self) -> str: # this part of code should be never reached @@ -245,9 +247,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non if c.get(h, False) is True ) column_name = self.capabilities.escape_identifier(c["name"]) - return ( - f"{column_name} {self.type_mapper.to_db_type(c)} {hints_str} {self._gen_not_null(c.get('nullable', True))}" - ) + return f"{column_name} {self.type_mapper.to_db_type(c)} {hints_str} {self._gen_not_null(c.get('nullable', True))}" def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: """Starts SqlLoadJob for files ending with .sql or returns None to let derived classes to handle their specific jobs""" diff --git a/dlt/destinations/impl/snowflake/configuration.py b/dlt/destinations/impl/snowflake/configuration.py index 01f5ca6e03..0e0f3daa38 100644 --- a/dlt/destinations/impl/snowflake/configuration.py +++ b/dlt/destinations/impl/snowflake/configuration.py @@ -145,4 +145,5 @@ def __init__( keep_staged_files: bool = True, destination_name: str = None, environment: str = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index 67df78c138..ddebbde70a 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -175,13 +175,15 @@ def __init__( f'PUT file://{file_path} @{stage_name}/"{load_id}" OVERWRITE = TRUE,' " AUTO_COMPRESS = FALSE" ) - client.execute_sql(f"""COPY INTO {qualified_table_name} + client.execute_sql( + f"""COPY INTO {qualified_table_name} {from_clause} {files_clause} {credentials_clause} FILE_FORMAT = {source_format} MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE' - """) + """ + ) if stage_file_path and not keep_staged_files: client.execute_sql(f"REMOVE {stage_file_path}") @@ -293,5 +295,7 @@ def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns] return exists, table # Snowflake converts all unquoted columns to UPPER CASE # Convert back to lower case to enable comparison with dlt schema - table = {col_name.lower(): dict(col, name=col_name.lower()) for col_name, col in table.items()} # type: ignore + table = { + col_name.lower(): dict(col, name=col_name.lower()) for col_name, col in table.items() + } # type: ignore return exists, table diff --git a/dlt/destinations/impl/weaviate/configuration.py b/dlt/destinations/impl/weaviate/configuration.py index 5014e69163..a806eaa3ba 100644 --- a/dlt/destinations/impl/weaviate/configuration.py +++ b/dlt/destinations/impl/weaviate/configuration.py @@ -80,4 +80,5 @@ def __init__( dataset_separator: str = None, vectorizer: str = None, module_config: Dict[str, Dict[str, str]] = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index d97a098669..ccd4886f66 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -286,10 +286,11 @@ def gen_merge_sql( ) # create temp table used to deduplicate, only when we have primary keys if primary_keys: - create_insert_temp_table_sql, insert_temp_table_name = ( - cls.gen_insert_temp_table_sql( - staging_root_table_name, primary_keys, unique_column - ) + ( + create_insert_temp_table_sql, + insert_temp_table_name, + ) = cls.gen_insert_temp_table_sql( + staging_root_table_name, primary_keys, unique_column ) sql.extend(create_insert_temp_table_sql) diff --git a/dlt/destinations/typing.py b/dlt/destinations/typing.py index 99ffed01fd..1137548221 100644 --- a/dlt/destinations/typing.py +++ b/dlt/destinations/typing.py @@ -10,9 +10,11 @@ class DBTransaction(Protocol): - def commit_transaction(self) -> None: ... + def commit_transaction(self) -> None: + ... - def rollback_transaction(self) -> None: ... + def rollback_transaction(self) -> None: + ... class DBApi(Protocol): @@ -29,11 +31,20 @@ class DBApiCursor(Protocol): native_cursor: "DBApiCursor" """Cursor implementation native to current destination""" - def execute(self, query: AnyStr, *args: Any, **kwargs: Any) -> None: ... - def fetchall(self) -> List[Tuple[Any, ...]]: ... - def fetchmany(self, size: int = ...) -> List[Tuple[Any, ...]]: ... - def fetchone(self) -> Optional[Tuple[Any, ...]]: ... - def close(self) -> None: ... + def execute(self, query: AnyStr, *args: Any, **kwargs: Any) -> None: + ... + + def fetchall(self) -> List[Tuple[Any, ...]]: + ... + + def fetchmany(self, size: int = ...) -> List[Tuple[Any, ...]]: + ... + + def fetchone(self) -> Optional[Tuple[Any, ...]]: + ... + + def close(self) -> None: + ... def df(self, chunk_size: int = None, **kwargs: None) -> Optional[DataFrame]: """Fetches the results as data frame. For large queries the results may be chunked diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index cf7426e683..0dc7f63835 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -75,7 +75,8 @@ class SourceSchemaInjectableContext(ContainerInjectableContext): if TYPE_CHECKING: - def __init__(self, schema: Schema = None) -> None: ... + def __init__(self, schema: Schema = None) -> None: + ... TSourceFunParams = ParamSpec("TSourceFunParams") @@ -95,7 +96,8 @@ def source( schema_contract: TSchemaContract = None, spec: Type[BaseConfiguration] = None, _impl_cls: Type[TDltSourceImpl] = DltSource, # type: ignore[assignment] -) -> Callable[TSourceFunParams, DltSource]: ... +) -> Callable[TSourceFunParams, DltSource]: + ... @overload @@ -110,7 +112,8 @@ def source( schema_contract: TSchemaContract = None, spec: Type[BaseConfiguration] = None, _impl_cls: Type[TDltSourceImpl] = DltSource, # type: ignore[assignment] -) -> Callable[[Callable[TSourceFunParams, Any]], Callable[TSourceFunParams, TDltSourceImpl]]: ... +) -> Callable[[Callable[TSourceFunParams, Any]], Callable[TSourceFunParams, TDltSourceImpl]]: + ... def source( @@ -258,7 +261,8 @@ def resource( table_format: TTableHintTemplate[TTableFormat] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, -) -> DltResource: ... +) -> DltResource: + ... @overload @@ -275,7 +279,8 @@ def resource( table_format: TTableHintTemplate[TTableFormat] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, -) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: ... +) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: + ... @overload @@ -293,7 +298,8 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, standalone: Literal[True] = True, -) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, DltResource]]: ... +) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, DltResource]]: + ... @overload @@ -310,7 +316,8 @@ def resource( table_format: TTableHintTemplate[TTableFormat] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, -) -> DltResource: ... +) -> DltResource: + ... def resource( @@ -415,7 +422,7 @@ def make_resource( ) def decorator( - f: Callable[TResourceFunParams, Any] + f: Callable[TResourceFunParams, Any], ) -> Callable[TResourceFunParams, DltResource]: if not callable(f): if data_from: @@ -522,7 +529,8 @@ def transformer( merge_key: TTableHintTemplate[TColumnNames] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, -) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], DltResource]: ... +) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], DltResource]: + ... @overload @@ -542,7 +550,8 @@ def transformer( ) -> Callable[ [Callable[Concatenate[TDataItem, TResourceFunParams], Any]], Callable[TResourceFunParams, DltResource], -]: ... +]: + ... @overload @@ -558,7 +567,8 @@ def transformer( merge_key: TTableHintTemplate[TColumnNames] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, -) -> DltResource: ... +) -> DltResource: + ... @overload @@ -575,7 +585,8 @@ def transformer( selected: bool = True, spec: Type[BaseConfiguration] = None, standalone: Literal[True] = True, -) -> Callable[TResourceFunParams, DltResource]: ... +) -> Callable[TResourceFunParams, DltResource]: + ... def transformer( @@ -703,7 +714,7 @@ def get_source_schema() -> Schema: def defer( - f: Callable[TDeferredFunParams, TBoundItems] + f: Callable[TDeferredFunParams, TBoundItems], ) -> Callable[TDeferredFunParams, TDeferred[TBoundItems]]: @wraps(f) def _wrap(*args: Any, **kwargs: Any) -> TDeferred[TBoundItems]: diff --git a/dlt/extract/exceptions.py b/dlt/extract/exceptions.py index 8e7d0dddf8..447f36c876 100644 --- a/dlt/extract/exceptions.py +++ b/dlt/extract/exceptions.py @@ -71,7 +71,9 @@ def __init__(self, pipe_name: str, gen: Any, msg: str, kind: str) -> None: self.func_name = ( gen.__name__ if isgenerator(gen) - else get_callable_name(gen) if callable(gen) else str(gen) + else get_callable_name(gen) + if callable(gen) + else str(gen) ) super().__init__( pipe_name, @@ -139,8 +141,7 @@ def __init__(self, resource_name: str, item: Any, _typ: Type[Any], msg: str) -> super().__init__( resource_name, f"Cannot create resource {resource_name} from specified data. If you want to process" - " just one data item, enclose it in a list. " - + msg, + " just one data item, enclose it in a list. " + msg, ) diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index bc32893677..411026da10 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -209,9 +209,7 @@ def write_items(self, resource: DltResource, items: TDataItems, meta: Any) -> No for tbl in ( ( # 1. Convert pandas frame(s) to arrow Table - pa.Table.from_pandas(item) - if (pd and isinstance(item, pd.DataFrame)) - else item + pa.Table.from_pandas(item) if (pd and isinstance(item, pd.DataFrame)) else item ) for item in (items if isinstance(items, list) else [items]) ) diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 437dbbc6bd..66ab6e719d 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -105,7 +105,11 @@ def compute_table_schema(self, item: TDataItem = None) -> TTableSchema: if self._table_name_hint_fun and item is None: raise DataItemRequiredForDynamicTableHints(self.name) # resolve - resolved_template: TResourceHints = {k: self._resolve_hint(item, v) for k, v in table_template.items() if k not in ["incremental", "validator", "original_columns"]} # type: ignore + resolved_template: TResourceHints = { + k: self._resolve_hint(item, v) + for k, v in table_template.items() + if k not in ["incremental", "validator", "original_columns"] + } # type: ignore table_schema = self._merge_keys(resolved_template) table_schema["resource"] = self.name validate_dict_ignoring_xkeys( diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index adf0c33ad3..d49912244a 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -67,7 +67,8 @@ def __init__( def __call__( self, row: TDataItem, - ) -> Tuple[bool, bool, bool]: ... + ) -> Tuple[bool, bool, bool]: + ... class JsonIncremental(IncrementalTransform): diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 6f02f882bc..35812a5fb4 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -628,7 +628,9 @@ def __next__(self) -> PipeItem: elif callable(item): future = self._ensure_thread_pool().submit(item) # print(future) - self._futures.append(FuturePipeItem(future, pipe_item.step, pipe_item.pipe, pipe_item.meta)) # type: ignore + self._futures.append( + FuturePipeItem(future, pipe_item.step, pipe_item.pipe, pipe_item.meta) + ) # type: ignore # pipe item consumed for now, request a new one pipe_item = None continue diff --git a/dlt/extract/source.py b/dlt/extract/source.py index b1f59f7bda..4160a37af7 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -371,7 +371,9 @@ def __iter__(self) -> Iterator[TDataItem]: # managed pipe iterator will set the context on each call to __next__ with inject_section(section_context), Container().injectable_context(state_context): - pipe_iterator: ManagedPipeIterator = ManagedPipeIterator.from_pipes(self._resources.selected_pipes) # type: ignore + pipe_iterator: ManagedPipeIterator = ManagedPipeIterator.from_pipes( + self._resources.selected_pipes + ) # type: ignore pipe_iterator.set_context([section_context, state_context]) _iter = map(lambda item: item.item, pipe_iterator) return flatten_list_or_items(_iter) diff --git a/dlt/helpers/streamlit_helper.py b/dlt/helpers/streamlit_helper.py index d3e194b18d..f28b6887d3 100644 --- a/dlt/helpers/streamlit_helper.py +++ b/dlt/helpers/streamlit_helper.py @@ -123,13 +123,15 @@ def _query_data_live(query: str, schema_name: str = None) -> pd.DataFrame: st.header("Pipeline info") credentials = pipeline.sql_client().credentials schema_names = ", ".join(sorted(pipeline.schema_names)) - st.markdown(f""" + st.markdown( + f""" * pipeline name: **{pipeline.pipeline_name}** * destination: **{str(credentials)}** in **{pipeline.destination.destination_description}** * dataset name: **{pipeline.dataset_name}** * default schema name: **{pipeline.default_schema_name}** * all schema names: **{schema_names}** - """) + """ + ) st.header("Last load info") col1, col2, col3 = st.columns(3) diff --git a/dlt/load/configuration.py b/dlt/load/configuration.py index 0a84e3c331..fd9403b440 100644 --- a/dlt/load/configuration.py +++ b/dlt/load/configuration.py @@ -27,4 +27,5 @@ def __init__( workers: int = None, raise_on_failed_jobs: bool = False, _load_storage_config: LoadStorageConfiguration = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/normalize/configuration.py b/dlt/normalize/configuration.py index 3949a07fa8..493ea83906 100644 --- a/dlt/normalize/configuration.py +++ b/dlt/normalize/configuration.py @@ -20,7 +20,8 @@ class ItemsNormalizerConfiguration(BaseConfiguration): if TYPE_CHECKING: - def __init__(self, add_dlt_id: bool = None, add_dlt_load_id: bool = None) -> None: ... + def __init__(self, add_dlt_id: bool = None, add_dlt_load_id: bool = None) -> None: + ... @configspec @@ -51,4 +52,5 @@ def __init__( _schema_storage_config: SchemaStorageConfiguration = None, _normalize_storage_config: NormalizeStorageConfiguration = None, _load_storage_config: LoadStorageConfiguration = None, - ) -> None: ... + ) -> None: + ... diff --git a/dlt/normalize/items_normalizers.py b/dlt/normalize/items_normalizers.py index 2167250036..cc15c781cd 100644 --- a/dlt/normalize/items_normalizers.py +++ b/dlt/normalize/items_normalizers.py @@ -42,7 +42,8 @@ def __init__( self.config = config @abstractmethod - def __call__(self, extracted_items_file: str, root_table_name: str) -> List[TSchemaUpdate]: ... + def __call__(self, extracted_items_file: str, root_table_name: str) -> List[TSchemaUpdate]: + ... class JsonLItemsNormalizer(ItemsNormalizer): diff --git a/dlt/pipeline/helpers.py b/dlt/pipeline/helpers.py index 7bba5f84e7..0dbae6cf2c 100644 --- a/dlt/pipeline/helpers.py +++ b/dlt/pipeline/helpers.py @@ -31,7 +31,7 @@ def retry_load( - retry_on_pipeline_steps: Sequence[TPipelineStep] = ("load",) + retry_on_pipeline_steps: Sequence[TPipelineStep] = ("load",), ) -> Callable[[BaseException], bool]: """A retry strategy for Tenacity that, with default setting, will repeat `load` step for all exceptions that are not terminal diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 73c8f076d1..a16f0f55ec 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -1163,9 +1163,9 @@ def _set_context(self, is_active: bool) -> None: # set destination context on activation if self.destination: # inject capabilities context - self._container[DestinationCapabilitiesContext] = ( - self._get_destination_capabilities() - ) + self._container[ + DestinationCapabilitiesContext + ] = self._get_destination_capabilities() else: # remove destination context on deactivation if DestinationCapabilitiesContext in self._container: @@ -1226,9 +1226,7 @@ def _maybe_destination_capabilities( self.destination.destination_name, ( # DestinationReference.to_name(self.destination), - self.staging.destination_name - if self.staging - else None + self.staging.destination_name if self.staging else None ), # DestinationReference.to_name(self.staging) if self.staging else None, destination_caps, diff --git a/dlt/pipeline/trace.py b/dlt/pipeline/trace.py index 5679884b0b..f17f5cdb71 100644 --- a/dlt/pipeline/trace.py +++ b/dlt/pipeline/trace.py @@ -198,11 +198,13 @@ def __str__(self) -> str: class SupportsTracking(Protocol): def on_start_trace( self, trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline - ) -> None: ... + ) -> None: + ... def on_start_trace_step( self, trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline - ) -> None: ... + ) -> None: + ... def on_end_trace_step( self, @@ -211,11 +213,13 @@ def on_end_trace_step( pipeline: SupportsPipeline, step_info: Any, send_state: bool, - ) -> None: ... + ) -> None: + ... def on_end_trace( self, trace: PipelineTrace, pipeline: SupportsPipeline, send_state: bool - ) -> None: ... + ) -> None: + ... # plug in your own tracking modules here diff --git a/docs/examples/archive/sources/google_sheets.py b/docs/examples/archive/sources/google_sheets.py index 69855154ae..a5532d9ad9 100644 --- a/docs/examples/archive/sources/google_sheets.py +++ b/docs/examples/archive/sources/google_sheets.py @@ -17,7 +17,7 @@ def _initialize_sheets( - credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials] + credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials], ) -> Any: # Build the service object. service = build("sheets", "v4", credentials=credentials.to_native_credentials()) diff --git a/docs/examples/archive/sources/singer_tap.py b/docs/examples/archive/sources/singer_tap.py index 3c733c33f1..3e8bdad849 100644 --- a/docs/examples/archive/sources/singer_tap.py +++ b/docs/examples/archive/sources/singer_tap.py @@ -104,7 +104,7 @@ def singer_messages() -> Iterator[TDataItem]: os.path.abspath(config_file_path), "--catalog", os.path.abspath(catalog_file_path), - *state_params + *state_params, ) yield from get_source_from_stream(pipe_iterator, state) diff --git a/docs/examples/google_sheets/google_sheets.py b/docs/examples/google_sheets/google_sheets.py index 8a93df9970..d350a4f306 100644 --- a/docs/examples/google_sheets/google_sheets.py +++ b/docs/examples/google_sheets/google_sheets.py @@ -9,13 +9,15 @@ ) from dlt.common.typing import DictStrAny, StrAny + def _initialize_sheets( - credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials] + credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials], ) -> Any: # Build the service object. service = build("sheets", "v4", credentials=credentials.to_native_credentials()) return service + @dlt.source def google_spreadsheet( spreadsheet_id: str, @@ -55,6 +57,7 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: for name in sheet_names ] + if __name__ == "__main__": pipeline = dlt.pipeline(destination="duckdb") # see example.secrets.toml to where to put credentials @@ -67,4 +70,4 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: sheet_names=range_names, ) ) - print(info) \ No newline at end of file + print(info) diff --git a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py index d1ba3537ea..fecd842214 100644 --- a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py +++ b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py @@ -10,11 +10,7 @@ def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) - yield { - "file_name": filename, - "file_path": file_path, - "mtime": os.path.getmtime(file_path) - } + yield {"file_name": filename, "file_path": file_path, "mtime": os.path.getmtime(file_path)} @dlt.transformer(primary_key="page_id", write_disposition="merge") @@ -30,10 +26,8 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item -pipeline = dlt.pipeline( - pipeline_name='pdf_to_text', - destination='weaviate' -) + +pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" # (3) sends them to pdf_to_text transformer with pipe (|) operator @@ -46,9 +40,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): pdf_pipeline.table_name = "InvoiceText" # use weaviate_adapter to tell destination to vectorize "text" column -load_info = pipeline.run( - weaviate_adapter(pdf_pipeline, vectorize="text") -) +load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text")) row_counts = pipeline.last_trace.last_normalize_info print(row_counts) print("------") @@ -58,4 +50,4 @@ def pdf_to_text(file_item, separate_pages: bool = False): client = weaviate.Client("http://localhost:8080") # get text of all the invoices in InvoiceText class we just created above -print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) \ No newline at end of file +print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) diff --git a/docs/examples/qdrant_zendesk/qdrant.py b/docs/examples/qdrant_zendesk/qdrant.py index bd0cbafc99..1add22db63 100644 --- a/docs/examples/qdrant_zendesk/qdrant.py +++ b/docs/examples/qdrant_zendesk/qdrant.py @@ -51,7 +51,7 @@ def tickets_data( initial_value=start_date_obj, end_value=end_date_obj, allow_external_schedulers=True, - ) + ), ): # URL For ticket events # 'https://d3v-dlthub.zendesk.com/api/v2/incremental/tickets_data.json?start_time=946684800' diff --git a/docs/website/docs/conftest.py b/docs/website/docs/conftest.py index 87ccffe53b..49391c0ebe 100644 --- a/docs/website/docs/conftest.py +++ b/docs/website/docs/conftest.py @@ -52,6 +52,6 @@ def _initial_providers(): def pytest_configure(config): # push sentry to ci - os.environ["RUNTIME__SENTRY_DSN"] = ( - "https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752" - ) + os.environ[ + "RUNTIME__SENTRY_DSN" + ] = "https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752" diff --git a/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py b/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py index 4f32f65370..851800fdf4 100644 --- a/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py +++ b/docs/website/docs/examples/google_sheets/code/google_sheets-snippets.py @@ -19,7 +19,7 @@ def google_sheets_snippet() -> None: from dlt.common.typing import DictStrAny, StrAny def _initialize_sheets( - credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials] + credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials], ) -> Any: # Build the service object. service = build("sheets", "v4", credentials=credentials.to_native_credentials()) @@ -80,8 +80,8 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: ) ) print(info) - # @@@DLT_SNIPPET_END google_sheets_run - # @@@DLT_SNIPPET_END example + # @@@DLT_SNIPPET_END google_sheets_run + # @@@DLT_SNIPPET_END example row_counts = pipeline.last_trace.last_normalize_info.row_counts print(row_counts.keys()) assert row_counts["hidden_columns_merged_cells"] == 7 diff --git a/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py b/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py index fddae74ddf..1ad7cc8159 100644 --- a/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py +++ b/docs/website/docs/examples/pdf_to_weaviate/code/pdf_to_weaviate-snippets.py @@ -1,5 +1,6 @@ from tests.pipeline.utils import assert_load_info + def pdf_to_weaviate_snippet() -> None: # @@@DLT_SNIPPET_START example # @@@DLT_SNIPPET_START pdf_to_weaviate @@ -9,7 +10,6 @@ def pdf_to_weaviate_snippet() -> None: from dlt.destinations.impl.weaviate import weaviate_adapter from PyPDF2 import PdfReader - @dlt.resource(selected=False) def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) @@ -18,10 +18,9 @@ def list_files(folder_path: str): yield { "file_name": filename, "file_path": file_path, - "mtime": os.path.getmtime(file_path) + "mtime": os.path.getmtime(file_path), } - @dlt.transformer(primary_key="page_id", write_disposition="merge") def pdf_to_text(file_item, separate_pages: bool = False): if not separate_pages: @@ -35,10 +34,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item - pipeline = dlt.pipeline( - pipeline_name='pdf_to_text', - destination='weaviate' - ) + pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" # (3) sends them to pdf_to_text transformer with pipe (|) operator @@ -51,9 +47,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): pdf_pipeline.table_name = "InvoiceText" # use weaviate_adapter to tell destination to vectorize "text" column - load_info = pipeline.run( - weaviate_adapter(pdf_pipeline, vectorize="text") - ) + load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text")) row_counts = pipeline.last_trace.last_normalize_info print(row_counts) print("------") diff --git a/docs/website/docs/examples/qdrant_zendesk/code/qdrant-snippets.py b/docs/website/docs/examples/qdrant_zendesk/code/qdrant-snippets.py index 07b44c6638..04c36345fb 100644 --- a/docs/website/docs/examples/qdrant_zendesk/code/qdrant-snippets.py +++ b/docs/website/docs/examples/qdrant_zendesk/code/qdrant-snippets.py @@ -61,7 +61,7 @@ def tickets_data( initial_value=start_date_obj, end_value=end_date_obj, allow_external_schedulers=True, - ) + ), ): # URL For ticket events # 'https://d3v-dlthub.zendesk.com/api/v2/incremental/tickets_data.json?start_time=946684800' diff --git a/docs/website/docs/getting-started-snippets.py b/docs/website/docs/getting-started-snippets.py index eb00df9986..b6907f8526 100644 --- a/docs/website/docs/getting-started-snippets.py +++ b/docs/website/docs/getting-started-snippets.py @@ -147,7 +147,7 @@ def incremental_snippet() -> None: @dlt.resource(table_name="issues", write_disposition="append") def get_issues( - created_at=dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z") + created_at=dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z"), ): # NOTE: we read only open issues to minimize number of calls to the API. There's a limit of ~50 calls for not authenticated Github users url = "https://api.github.com/repos/dlt-hub/dlt/issues?per_page=100&sort=created&directions=desc&state=open" @@ -194,7 +194,7 @@ def incremental_merge_snippet() -> None: primary_key="id", ) def get_issues( - updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") + updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z"), ): # NOTE: we read only open issues to minimize number of calls to the API. There's a limit of ~50 calls for not authenticated Github users url = f"https://api.github.com/repos/dlt-hub/dlt/issues?since={updated_at.last_value}&per_page=100&sort=updated&directions=desc&state=open" diff --git a/docs/website/docs/intro-snippets.py b/docs/website/docs/intro-snippets.py index 340a6ff262..f270dcee6e 100644 --- a/docs/website/docs/intro-snippets.py +++ b/docs/website/docs/intro-snippets.py @@ -18,14 +18,13 @@ def intro_snippet() -> None: response.raise_for_status() data.append(response.json()) # Extract, normalize, and load the data - load_info = pipeline.run(data, table_name='player') + load_info = pipeline.run(data, table_name="player") # @@@DLT_SNIPPET_END api assert_load_info(load_info) def csv_snippet() -> None: - # @@@DLT_SNIPPET_START csv import dlt import pandas as pd @@ -50,8 +49,8 @@ def csv_snippet() -> None: assert_load_info(load_info) -def db_snippet() -> None: +def db_snippet() -> None: # @@@DLT_SNIPPET_START db import dlt from sqlalchemy import create_engine @@ -74,13 +73,9 @@ def db_snippet() -> None: ) # Convert the rows into dictionaries on the fly with a map function - load_info = pipeline.run( - map(lambda row: dict(row._mapping), rows), - table_name="genome" - ) + load_info = pipeline.run(map(lambda row: dict(row._mapping), rows), table_name="genome") print(load_info) # @@@DLT_SNIPPET_END db assert_load_info(load_info) - diff --git a/docs/website/docs/tutorial/load-data-from-an-api-snippets.py b/docs/website/docs/tutorial/load-data-from-an-api-snippets.py index cd7004bdbe..25c7a40207 100644 --- a/docs/website/docs/tutorial/load-data-from-an-api-snippets.py +++ b/docs/website/docs/tutorial/load-data-from-an-api-snippets.py @@ -3,7 +3,6 @@ def basic_api_snippet() -> None: - # @@@DLT_SNIPPET_START basic_api import dlt from dlt.sources.helpers import requests @@ -15,9 +14,9 @@ def basic_api_snippet() -> None: response.raise_for_status() pipeline = dlt.pipeline( - pipeline_name='github_issues', - destination='duckdb', - dataset_name='github_data', + pipeline_name="github_issues", + destination="duckdb", + dataset_name="github_data", ) # The response contains a list of issues load_info = pipeline.run(response.json(), table_name="issues") @@ -29,19 +28,15 @@ def basic_api_snippet() -> None: def replace_snippet() -> None: - # @@@DLT_SNIPPET_START replace import dlt - data = [ - {'id': 1, 'name': 'Alice'}, - {'id': 2, 'name': 'Bob'} - ] + data = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}] pipeline = dlt.pipeline( - pipeline_name='replace_data', - destination='duckdb', - dataset_name='mydata', + pipeline_name="replace_data", + destination="duckdb", + dataset_name="mydata", ) load_info = pipeline.run(data, table_name="users", write_disposition="replace") @@ -52,14 +47,13 @@ def replace_snippet() -> None: def incremental_snippet() -> None: - # @@@DLT_SNIPPET_START incremental import dlt from dlt.sources.helpers import requests @dlt.resource(table_name="issues", write_disposition="append") def get_issues( - created_at=dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z") + created_at=dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z"), ): # NOTE: we read only open issues to minimize number of calls to the API. # There's a limit of ~50 calls for not authenticated Github users. @@ -86,9 +80,9 @@ def get_issues( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_issues_incremental', - destination='duckdb', - dataset_name='github_data_append', + pipeline_name="github_issues_incremental", + destination="duckdb", + dataset_name="github_data_append", ) load_info = pipeline.run(get_issues) @@ -103,7 +97,6 @@ def get_issues( def incremental_merge_snippet() -> None: - # @@@DLT_SNIPPET_START incremental_merge import dlt from dlt.sources.helpers import requests @@ -114,7 +107,7 @@ def incremental_merge_snippet() -> None: primary_key="id", ) def get_issues( - updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") + updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z"), ): # NOTE: we read only open issues to minimize number of calls to # the API. There's a limit of ~50 calls for not authenticated @@ -136,9 +129,9 @@ def get_issues( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_issues_merge', - destination='duckdb', - dataset_name='github_data_merge', + pipeline_name="github_issues_merge", + destination="duckdb", + dataset_name="github_data_merge", ) load_info = pipeline.run(get_issues) row_counts = pipeline.last_trace.last_normalize_info @@ -152,15 +145,12 @@ def get_issues( def table_dispatch_snippet() -> None: - # @@@DLT_SNIPPET_START table_dispatch import dlt from dlt.sources.helpers import requests @dlt.resource(primary_key="id", table_name=lambda i: i["type"], write_disposition="append") - def repo_events( - last_created_at = dlt.sources.incremental("created_at") - ): + def repo_events(last_created_at=dlt.sources.incremental("created_at")): url = "https://api.github.com/repos/dlt-hub/dlt/events?per_page=100" while True: @@ -179,9 +169,9 @@ def repo_events( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_events', - destination='duckdb', - dataset_name='github_events_data', + pipeline_name="github_events", + destination="duckdb", + dataset_name="github_events_data", ) load_info = pipeline.run(repo_events) row_counts = pipeline.last_trace.last_normalize_info diff --git a/poetry.lock b/poetry.lock index c5da40c604..8588cdeb5f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -7030,6 +7030,32 @@ files = [ [package.dependencies] pyasn1 = ">=0.1.3" +[[package]] +name = "ruff" +version = "0.1.13" +description = "An extremely fast Python linter and code formatter, written in Rust." +optional = false +python-versions = ">=3.7" +files = [ + {file = "ruff-0.1.13-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:e3fd36e0d48aeac672aa850045e784673449ce619afc12823ea7868fcc41d8ba"}, + {file = "ruff-0.1.13-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:9fb6b3b86450d4ec6a6732f9f60c4406061b6851c4b29f944f8c9d91c3611c7a"}, + {file = "ruff-0.1.13-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b13ba5d7156daaf3fd08b6b993360a96060500aca7e307d95ecbc5bb47a69296"}, + {file = "ruff-0.1.13-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:9ebb40442f7b531e136d334ef0851412410061e65d61ca8ce90d894a094feb22"}, + {file = "ruff-0.1.13-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:226b517f42d59a543d6383cfe03cccf0091e3e0ed1b856c6824be03d2a75d3b6"}, + {file = "ruff-0.1.13-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:5f0312ba1061e9b8c724e9a702d3c8621e3c6e6c2c9bd862550ab2951ac75c16"}, + {file = "ruff-0.1.13-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2f59bcf5217c661254bd6bc42d65a6fd1a8b80c48763cb5c2293295babd945dd"}, + {file = "ruff-0.1.13-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:e6894b00495e00c27b6ba61af1fc666f17de6140345e5ef27dd6e08fb987259d"}, + {file = "ruff-0.1.13-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9a1600942485c6e66119da294c6294856b5c86fd6df591ce293e4a4cc8e72989"}, + {file = "ruff-0.1.13-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ee3febce7863e231a467f90e681d3d89210b900d49ce88723ce052c8761be8c7"}, + {file = "ruff-0.1.13-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:dcaab50e278ff497ee4d1fe69b29ca0a9a47cd954bb17963628fa417933c6eb1"}, + {file = "ruff-0.1.13-py3-none-musllinux_1_2_i686.whl", hash = "sha256:f57de973de4edef3ad3044d6a50c02ad9fc2dff0d88587f25f1a48e3f72edf5e"}, + {file = "ruff-0.1.13-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:7a36fa90eb12208272a858475ec43ac811ac37e91ef868759770b71bdabe27b6"}, + {file = "ruff-0.1.13-py3-none-win32.whl", hash = "sha256:a623349a505ff768dad6bd57087e2461be8db58305ebd5577bd0e98631f9ae69"}, + {file = "ruff-0.1.13-py3-none-win_amd64.whl", hash = "sha256:f988746e3c3982bea7f824c8fa318ce7f538c4dfefec99cd09c8770bd33e6539"}, + {file = "ruff-0.1.13-py3-none-win_arm64.whl", hash = "sha256:6bbbc3042075871ec17f28864808540a26f0f79a4478c357d3e3d2284e832998"}, + {file = "ruff-0.1.13.tar.gz", hash = "sha256:e261f1baed6291f434ffb1d5c6bd8051d1c2a26958072d38dfbec39b3dda7352"}, +] + [[package]] name = "s3fs" version = "2023.6.0" @@ -8471,4 +8497,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.13" -content-hash = "cf751b2e1e9c66efde0a11774b5204e3206a14fd04ba4c79b2d37e38db5367ad" +content-hash = "c85b3637a2f777a149c35faface09595d3f537574532a28d5e649d758a1a33d4" diff --git a/pyproject.toml b/pyproject.toml index 6436ec23a7..c08ea53733 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,7 @@ weaviate-client = {version = ">=3.22", optional = true} adlfs = {version = ">=2022.4.0", optional = true} pyodbc = {version = "^4.0.39", optional = true} qdrant-client = {version = "^1.6.4", optional = true, extras = ["fastembed"]} +ruff = "^0.1.13" [tool.poetry.extras] dbt = ["dbt-core", "dbt-redshift", "dbt-bigquery", "dbt-duckdb", "dbt-snowflake", "dbt-athena-community"] @@ -188,7 +189,7 @@ profile = "black" src_paths = ["dlt"] multi_line_output = 3 -[tool.ruff] # https://beta.ruff.rs/docs/ +[tool.ruff] line-length = 100 ignore = ["F401"] ignore-init-module-imports = true diff --git a/tests/cases.py b/tests/cases.py index 8653f999c6..41c30b695b 100644 --- a/tests/cases.py +++ b/tests/cases.py @@ -186,14 +186,16 @@ def assert_all_data_types_row( parsed_date = pendulum.instance(db_mapping["col4_precision"]) db_mapping["col4_precision"] = reduce_pendulum_datetime_precision(parsed_date, 3) expected_rows["col4_precision"] = reduce_pendulum_datetime_precision( - ensure_pendulum_datetime(expected_rows["col4_precision"]), 3 # type: ignore[arg-type] + ensure_pendulum_datetime(expected_rows["col4_precision"]), + 3, # type: ignore[arg-type] ) if "col11_precision" in expected_rows: parsed_time = ensure_pendulum_time(db_mapping["col11_precision"]) db_mapping["col11_precision"] = reduce_pendulum_datetime_precision(parsed_time, 3) expected_rows["col11_precision"] = reduce_pendulum_datetime_precision( - ensure_pendulum_time(expected_rows["col11_precision"]), 3 # type: ignore[arg-type] + ensure_pendulum_time(expected_rows["col11_precision"]), + 3, # type: ignore[arg-type] ) # redshift and bigquery return strings from structured fields diff --git a/tests/cli/test_deploy_command.py b/tests/cli/test_deploy_command.py index 685921ca6e..46094f0d22 100644 --- a/tests/cli/test_deploy_command.py +++ b/tests/cli/test_deploy_command.py @@ -43,7 +43,7 @@ def test_deploy_command_no_repo( "debug_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) # test wrapper @@ -51,7 +51,7 @@ def test_deploy_command_no_repo( "debug_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) assert rc == -3 @@ -77,14 +77,14 @@ def test_deploy_command( "debug_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) assert "Your current repository has no origin set" in py_ex.value.args[0] rc = _dlt.deploy_command_wrapper( "debug_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) assert rc == -5 @@ -95,13 +95,13 @@ def test_deploy_command( "debug_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) rc = _dlt.deploy_command_wrapper( "debug_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) assert rc == -2 @@ -118,14 +118,14 @@ def test_deploy_command( "debug_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) assert "The last pipeline run ended with error" in py_ex2.value.args[0] rc = _dlt.deploy_command_wrapper( "debug_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) assert rc == -2 @@ -142,7 +142,7 @@ def test_deploy_command( "debug_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) _out = buf.getvalue() print(_out) @@ -163,13 +163,13 @@ def test_deploy_command( "no_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) with echo.always_choose(False, always_choose_value=True): rc = _dlt.deploy_command_wrapper( "no_pipeline.py", deployment_method, deploy_command.COMMAND_DEPLOY_REPO_LOCATION, - **deployment_args + **deployment_args, ) assert rc == -4 diff --git a/tests/cli/test_init_command.py b/tests/cli/test_init_command.py index 479bedb6fb..2b949ff9ad 100644 --- a/tests/cli/test_init_command.py +++ b/tests/cli/test_init_command.py @@ -474,8 +474,7 @@ def test_incompatible_dlt_version_warning(repo_dir: str, project_files: FileStor assert ( "WARNING: This pipeline requires a newer version of dlt than your installed version" - " (0.1.1)." - in _out + " (0.1.1)." in _out ) diff --git a/tests/common/configuration/test_accessors.py b/tests/common/configuration/test_accessors.py index 147d56abec..d67114e064 100644 --- a/tests/common/configuration/test_accessors.py +++ b/tests/common/configuration/test_accessors.py @@ -158,7 +158,13 @@ def test_getter_accessor_typed(toml_providers: ConfigProvidersContext, environme c = dlt.secrets.get("databricks.credentials", ConnectionStringCredentials) # as before: the value in trace is the value coming from the provider (as is) assert RESOLVED_TRACES["databricks.credentials"] == ResolvedValueTrace( - "credentials", credentials_str, None, ConnectionStringCredentials, ["databricks"], SecretsTomlProvider().name, ConnectionStringCredentials # type: ignore[arg-type] + "credentials", + credentials_str, + None, + ConnectionStringCredentials, + ["databricks"], + SecretsTomlProvider().name, + ConnectionStringCredentials, # type: ignore[arg-type] ) assert c.drivername == "databricks+connector" c2 = dlt.secrets.get("destination.credentials", GcpServiceAccountCredentialsWithoutDefaults) @@ -184,7 +190,9 @@ def test_setter(toml_providers: ConfigProvidersContext, environment: Any) -> Non dlt.secrets["pipeline.new.credentials"] = {"api_key": "skjo87a7nnAAaa"} assert dlt.secrets["pipeline.new.credentials"] == {"api_key": "skjo87a7nnAAaa"} # check the toml directly - assert dlt.secrets.writable_provider._toml["pipeline"]["new"]["credentials"] == {"api_key": "skjo87a7nnAAaa"} # type: ignore[attr-defined] + assert dlt.secrets.writable_provider._toml["pipeline"]["new"]["credentials"] == { + "api_key": "skjo87a7nnAAaa" + } # type: ignore[attr-defined] # mod the config and use it to resolve the configuration dlt.config["pool"] = {"pool_type": "process", "workers": 21} diff --git a/tests/common/configuration/test_configuration.py b/tests/common/configuration/test_configuration.py index 81d49432d7..1b7b9562cc 100644 --- a/tests/common/configuration/test_configuration.py +++ b/tests/common/configuration/test_configuration.py @@ -130,7 +130,8 @@ class FieldWithNoDefaultConfiguration(RunConfiguration): if TYPE_CHECKING: - def __init__(self, no_default: str = None, sentry_dsn: str = None) -> None: ... + def __init__(self, no_default: str = None, sentry_dsn: str = None) -> None: + ... @configspec @@ -158,7 +159,8 @@ def on_resolved(self) -> None: if TYPE_CHECKING: - def __init__(self, head: str = None, tube: List[str] = None, heels: str = None) -> None: ... + def __init__(self, head: str = None, tube: List[str] = None, heels: str = None) -> None: + ... @configspec @@ -174,7 +176,8 @@ def __init__( default: str = None, instrumented: InstrumentedConfiguration = None, sectioned: SectionedConfiguration = None, - ) -> None: ... + ) -> None: + ... @configspec @@ -707,7 +710,9 @@ def test_removes_trace_value_from_exception_trace_attrs( ) -> None: with pytest.raises(ConfigFieldMissingException) as cf_missing_exc: resolve.resolve_configuration(CoercionTestConfiguration()) - cf_missing_exc.value.traces["str_val"][0] = cf_missing_exc.value.traces["str_val"][0]._replace(value="SECRET") # type: ignore[index] + cf_missing_exc.value.traces["str_val"][0] = cf_missing_exc.value.traces["str_val"][0]._replace( + value="SECRET" + ) # type: ignore[index] assert cf_missing_exc.value.traces["str_val"][0].value == "SECRET" attrs_ = cf_missing_exc.value.attrs() # values got cleared up @@ -940,7 +945,8 @@ class AutoBaseDerivationConfiguration: if TYPE_CHECKING: - def __init__(self, auto: str = None) -> None: ... + def __init__(self, auto: str = None) -> None: + ... assert issubclass(AutoBaseDerivationConfiguration, BaseConfiguration) assert hasattr(AutoBaseDerivationConfiguration, "auto") @@ -1085,8 +1091,14 @@ def test_resolved_trace(environment: Any) -> None: def test_extract_inner_hint() -> None: # extracts base config from an union - assert resolve.extract_inner_hint(Union[GcpServiceAccountCredentialsWithoutDefaults, StrAny, str]) is GcpServiceAccountCredentialsWithoutDefaults # type: ignore[arg-type] - assert resolve.extract_inner_hint(Union[InstrumentedConfiguration, StrAny, str]) is InstrumentedConfiguration # type: ignore[arg-type] + assert ( + resolve.extract_inner_hint(Union[GcpServiceAccountCredentialsWithoutDefaults, StrAny, str]) + is GcpServiceAccountCredentialsWithoutDefaults + ) # type: ignore[arg-type] + assert ( + resolve.extract_inner_hint(Union[InstrumentedConfiguration, StrAny, str]) + is InstrumentedConfiguration + ) # type: ignore[arg-type] # keeps unions assert resolve.extract_inner_hint(Union[StrAny, str]) is Union # type: ignore[arg-type] # ignores specialization in list and dict, leaving origin @@ -1108,7 +1120,10 @@ def test_is_secret_hint() -> None: TTestSecretNt = NewType("TTestSecretNt", GcpServiceAccountCredentialsWithoutDefaults) assert resolve.is_secret_hint(TTestSecretNt) is False # recognize unions with credentials - assert resolve.is_secret_hint(Union[GcpServiceAccountCredentialsWithoutDefaults, StrAny, str]) is True # type: ignore[arg-type] + assert ( + resolve.is_secret_hint(Union[GcpServiceAccountCredentialsWithoutDefaults, StrAny, str]) + is True + ) # type: ignore[arg-type] # we do not recognize unions if they do not contain configuration types assert resolve.is_secret_hint(Union[TSecretValue, StrAny, str]) is False # type: ignore[arg-type] assert resolve.is_secret_hint(Optional[str]) is False # type: ignore[arg-type] diff --git a/tests/common/configuration/test_container.py b/tests/common/configuration/test_container.py index 9521f5960d..068a1063f7 100644 --- a/tests/common/configuration/test_container.py +++ b/tests/common/configuration/test_container.py @@ -27,7 +27,8 @@ def parse_native_representation(self, native_value: Any) -> None: if TYPE_CHECKING: - def __init__(self, current_value: str = None) -> None: ... + def __init__(self, current_value: str = None) -> None: + ... @configspec diff --git a/tests/common/configuration/test_credentials.py b/tests/common/configuration/test_credentials.py index ae9b96e903..4ebe49942a 100644 --- a/tests/common/configuration/test_credentials.py +++ b/tests/common/configuration/test_credentials.py @@ -57,11 +57,14 @@ } """ -OAUTH_APP_USER_INFO = """ +OAUTH_APP_USER_INFO = ( + """ { "installed": %s } -""" % OAUTH_USER_INFO +""" + % OAUTH_USER_INFO +) def test_connection_string_credentials_native_representation(environment) -> None: diff --git a/tests/common/configuration/test_inject.py b/tests/common/configuration/test_inject.py index 8b9616ccd7..cc107a1dfb 100644 --- a/tests/common/configuration/test_inject.py +++ b/tests/common/configuration/test_inject.py @@ -265,7 +265,7 @@ def test_use_most_specific_union_type( ) -> None: @with_config def postgres_union( - local_credentials: Union[ConnectionStringCredentials, str, StrAny] = dlt.secrets.value + local_credentials: Union[ConnectionStringCredentials, str, StrAny] = dlt.secrets.value, ): return local_credentials diff --git a/tests/common/configuration/test_spec_union.py b/tests/common/configuration/test_spec_union.py index 25c32920bc..4fcddd8f73 100644 --- a/tests/common/configuration/test_spec_union.py +++ b/tests/common/configuration/test_spec_union.py @@ -181,10 +181,14 @@ def zen_source( # pass explicit dict assert list(zen_source(credentials={"email": "emx", "password": "pass"}))[0].email == "emx" # type: ignore[arg-type] - assert list(zen_source(credentials={"api_key": "🔑", "api_secret": ":secret:"}))[0].api_key == "🔑" # type: ignore[arg-type] + assert ( + list(zen_source(credentials={"api_key": "🔑", "api_secret": ":secret:"}))[0].api_key == "🔑" + ) # type: ignore[arg-type] # mixed credentials will not work with pytest.raises(ConfigFieldMissingException): - assert list(zen_source(credentials={"api_key": "🔑", "password": "pass"}))[0].api_key == "🔑" # type: ignore[arg-type] + assert ( + list(zen_source(credentials={"api_key": "🔑", "password": "pass"}))[0].api_key == "🔑" + ) # type: ignore[arg-type] class GoogleAnalyticsCredentialsBase(CredentialsConfiguration): @@ -212,7 +216,7 @@ class GoogleAnalyticsCredentialsOAuth(GoogleAnalyticsCredentialsBase): def google_analytics( credentials: Union[ GoogleAnalyticsCredentialsOAuth, GcpServiceAccountCredentials - ] = dlt.secrets.value + ] = dlt.secrets.value, ): yield dlt.resource([credentials], name="creds") diff --git a/tests/common/configuration/test_toml_provider.py b/tests/common/configuration/test_toml_provider.py index fcec881521..6c67a93f2b 100644 --- a/tests/common/configuration/test_toml_provider.py +++ b/tests/common/configuration/test_toml_provider.py @@ -214,8 +214,7 @@ def test_secrets_toml_credentials_from_native_repr( GcpServiceAccountCredentialsWithoutDefaults(), sections=("source",) ) assert ( - c.private_key - == "-----BEGIN PRIVATE" + c.private_key == "-----BEGIN PRIVATE" " KEY-----\nMIIEuwIBADANBgkqhkiG9w0BAQEFAASCBKUwggShAgEAAoIBAQCNEN0bL39HmD+S\n...\n-----END" " PRIVATE KEY-----\n" ) @@ -389,28 +388,33 @@ def test_write_toml_value(toml_providers: ConfigProvidersContext) -> None: if not provider.is_writable: continue - new_doc = tomlkit.parse(""" + new_doc = tomlkit.parse( + """ int_val=2232 [table] inner_int_val=2121 - """) + """ + ) # key == None replaces the whole document provider.set_value(None, new_doc, None) assert provider._toml == new_doc # key != None merges documents - to_merge_doc = tomlkit.parse(""" + to_merge_doc = tomlkit.parse( + """ int_val=2137 [babble] word1="do" word2="you" - """) + """ + ) provider.set_value("", to_merge_doc, None) - merged_doc = tomlkit.parse(""" + merged_doc = tomlkit.parse( + """ int_val=2137 [babble] @@ -420,7 +424,8 @@ def test_write_toml_value(toml_providers: ConfigProvidersContext) -> None: [table] inner_int_val=2121 - """) + """ + ) assert provider._toml == merged_doc # currently we ignore the key when merging tomlkit @@ -434,34 +439,51 @@ def test_write_toml_value(toml_providers: ConfigProvidersContext) -> None: def test_toml_string_provider() -> None: # test basic reading - provider = StringTomlProvider(""" + provider = StringTomlProvider( + """ [section1.subsection] key1 = "value1" [section2.subsection] key2 = "value2" -""") +""" + ) - assert provider.get_value("key1", "", "section1", "subsection") == ("value1", "section1.subsection.key1") # type: ignore[arg-type] - assert provider.get_value("key2", "", "section2", "subsection") == ("value2", "section2.subsection.key2") # type: ignore[arg-type] + assert provider.get_value("key1", "", "section1", "subsection") == ( + "value1", + "section1.subsection.key1", + ) # type: ignore[arg-type] + assert provider.get_value("key2", "", "section2", "subsection") == ( + "value2", + "section2.subsection.key2", + ) # type: ignore[arg-type] # test basic writing provider = StringTomlProvider("") assert provider.dumps() == "" provider.set_value("key1", "value1", "section1", "subsection") - assert provider.dumps() == """[section1.subsection] + assert ( + provider.dumps() + == """[section1.subsection] key1 = \"value1\" """ + ) provider.set_value("key1", "other_value", "section1", "subsection") - assert provider.dumps() == """[section1.subsection] + assert ( + provider.dumps() + == """[section1.subsection] key1 = \"other_value\" """ + ) provider.set_value("key1", "other_value", "section2", "subsection") - assert provider.dumps() == """[section1.subsection] + assert ( + provider.dumps() + == """[section1.subsection] key1 = \"other_value\" [section2.subsection] key1 = \"other_value\" """ + ) diff --git a/tests/common/configuration/utils.py b/tests/common/configuration/utils.py index 73643561dc..4aff73067c 100644 --- a/tests/common/configuration/utils.py +++ b/tests/common/configuration/utils.py @@ -82,7 +82,8 @@ class SectionedConfiguration(BaseConfiguration): if TYPE_CHECKING: - def __init__(self, password: str = None) -> None: ... + def __init__(self, password: str = None) -> None: + ... @pytest.fixture(scope="function") diff --git a/tests/common/normalizers/test_import_normalizers.py b/tests/common/normalizers/test_import_normalizers.py index df6b973943..ea0d455e85 100644 --- a/tests/common/normalizers/test_import_normalizers.py +++ b/tests/common/normalizers/test_import_normalizers.py @@ -53,9 +53,9 @@ def test_import_normalizers() -> None: assert config["json"] == {"module": "dlt.common.normalizers.json.relational"} os.environ["SCHEMA__NAMING"] = "direct" - os.environ["SCHEMA__JSON_NORMALIZER"] = ( - '{"module": "tests.common.normalizers.custom_normalizers"}' - ) + os.environ[ + "SCHEMA__JSON_NORMALIZER" + ] = '{"module": "tests.common.normalizers.custom_normalizers"}' config, naming, json_normalizer = import_normalizers(explicit_normalizers()) assert config["names"] == "direct" assert config["json"] == {"module": "tests.common.normalizers.custom_normalizers"} diff --git a/tests/common/normalizers/test_json_relational.py b/tests/common/normalizers/test_json_relational.py index 502ce619dd..c27a31b951 100644 --- a/tests/common/normalizers/test_json_relational.py +++ b/tests/common/normalizers/test_json_relational.py @@ -476,7 +476,9 @@ def test_keeps_dlt_id(norm: RelationalNormalizer) -> None: def test_propagate_hardcoded_context(norm: RelationalNormalizer) -> None: row = {"level": 1, "list": ["a", "b", "c"], "comp": [{"_timestamp": "a"}]} - rows = list(norm._normalize_row(row, {"_timestamp": 1238.9, "_dist_key": "SENDER_3000"}, ("table",))) # type: ignore[arg-type] + rows = list( + norm._normalize_row(row, {"_timestamp": 1238.9, "_dist_key": "SENDER_3000"}, ("table",)) + ) # type: ignore[arg-type] # context is not added to root element root = next(t for t in rows if t[0][0] == "table")[1] assert "_timestamp" in root diff --git a/tests/common/runtime/test_telemetry.py b/tests/common/runtime/test_telemetry.py index eece36aae7..9ec4f7c49f 100644 --- a/tests/common/runtime/test_telemetry.py +++ b/tests/common/runtime/test_telemetry.py @@ -43,7 +43,8 @@ def __init__( sentry_dsn: str = "https://sentry.io", dlthub_telemetry_segment_write_key: str = "TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB", log_level: str = "CRITICAL", - ) -> None: ... + ) -> None: + ... def test_sentry_log_level() -> None: diff --git a/tests/common/schema/test_filtering.py b/tests/common/schema/test_filtering.py index 8cfac9309f..1fec6d0a61 100644 --- a/tests/common/schema/test_filtering.py +++ b/tests/common/schema/test_filtering.py @@ -120,7 +120,12 @@ def test_filter_parent_table_schema_update(schema: Schema) -> None: def _add_excludes(schema: Schema) -> None: bot_table = new_table("event_bot") - bot_table.setdefault("filters", {})["excludes"] = ["re:^metadata", "re:^is_flagged$", "re:^data", "re:^custom_data"] # type: ignore[typeddict-item] + bot_table.setdefault("filters", {})["excludes"] = [ + "re:^metadata", + "re:^is_flagged$", + "re:^data", + "re:^custom_data", + ] # type: ignore[typeddict-item] bot_table["filters"]["includes"] = [ TSimpleRegex("re:^data__custom$"), TSimpleRegex("re:^custom_data__included_object__"), diff --git a/tests/common/schema/test_schema.py b/tests/common/schema/test_schema.py index 54892eeae5..582899571b 100644 --- a/tests/common/schema/test_schema.py +++ b/tests/common/schema/test_schema.py @@ -198,7 +198,10 @@ def test_schema_descriptions_and_annotations(schema_storage: SchemaStorage): schema.tables["blocks"]["columns"]["_dlt_load_id"]["description"] == "load id coming from the extractor" ) - assert schema.tables["blocks"]["columns"]["_dlt_load_id"]["x-column-annotation"] == "column annotation preserved on save" # type: ignore[typeddict-item] + assert ( + schema.tables["blocks"]["columns"]["_dlt_load_id"]["x-column-annotation"] + == "column annotation preserved on save" + ) # type: ignore[typeddict-item] # mod and save schema.tables["blocks"]["description"] += "Saved" @@ -213,7 +216,9 @@ def test_schema_descriptions_and_annotations(schema_storage: SchemaStorage): assert loaded_schema.tables["blocks"]["columns"]["_dlt_load_id"]["description"].endswith( "Saved" ) - assert loaded_schema.tables["blocks"]["columns"]["_dlt_load_id"]["x-column-annotation"].endswith("Saved") # type: ignore[typeddict-item] + assert loaded_schema.tables["blocks"]["columns"]["_dlt_load_id"][ + "x-column-annotation" + ].endswith("Saved") # type: ignore[typeddict-item] def test_replace_schema_content() -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 7e12990fd0..6e8c415a44 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -47,27 +47,35 @@ def pytest_configure(config): "TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB" ) delattr(run_configuration.RunConfiguration, "__init__") - run_configuration.RunConfiguration = dataclasses.dataclass(run_configuration.RunConfiguration, init=True, repr=False) # type: ignore + run_configuration.RunConfiguration = dataclasses.dataclass( + run_configuration.RunConfiguration, init=True, repr=False + ) # type: ignore # push telemetry to CI storage_configuration.LoadStorageConfiguration.load_volume_path = os.path.join( test_storage_root, "load" ) delattr(storage_configuration.LoadStorageConfiguration, "__init__") - storage_configuration.LoadStorageConfiguration = dataclasses.dataclass(storage_configuration.LoadStorageConfiguration, init=True, repr=False) # type: ignore[misc, call-overload] + storage_configuration.LoadStorageConfiguration = dataclasses.dataclass( + storage_configuration.LoadStorageConfiguration, init=True, repr=False + ) # type: ignore[misc, call-overload] storage_configuration.NormalizeStorageConfiguration.normalize_volume_path = os.path.join( test_storage_root, "normalize" ) # delete __init__, otherwise it will not be recreated by dataclass delattr(storage_configuration.NormalizeStorageConfiguration, "__init__") - storage_configuration.NormalizeStorageConfiguration = dataclasses.dataclass(storage_configuration.NormalizeStorageConfiguration, init=True, repr=False) # type: ignore[misc, call-overload] + storage_configuration.NormalizeStorageConfiguration = dataclasses.dataclass( + storage_configuration.NormalizeStorageConfiguration, init=True, repr=False + ) # type: ignore[misc, call-overload] storage_configuration.SchemaStorageConfiguration.schema_volume_path = os.path.join( test_storage_root, "schemas" ) delattr(storage_configuration.SchemaStorageConfiguration, "__init__") - storage_configuration.SchemaStorageConfiguration = dataclasses.dataclass(storage_configuration.SchemaStorageConfiguration, init=True, repr=False) # type: ignore[misc, call-overload] + storage_configuration.SchemaStorageConfiguration = dataclasses.dataclass( + storage_configuration.SchemaStorageConfiguration, init=True, repr=False + ) # type: ignore[misc, call-overload] assert run_configuration.RunConfiguration.config_files_storage_path == os.path.join( test_storage_root, "config/" diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index d1ff98fc26..4c4e601eaf 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -211,7 +211,9 @@ def camelCase(): def test_columns_argument() -> None: - @dlt.resource(name="user", columns={"tags": {"data_type": "complex", "x-extra": "x-annotation"}}) # type: ignore[typeddict-unknown-key] + @dlt.resource( + name="user", columns={"tags": {"data_type": "complex", "x-extra": "x-annotation"}} + ) # type: ignore[typeddict-unknown-key] def get_users(): yield {"u": "u", "tags": [1, 2, 3]} @@ -360,21 +362,21 @@ def test_source_sections() -> None: assert list(resource_f_2()) == ["NAME OVERRIDDEN LEVEL"] # values in function name section - os.environ[f"{known_sections.SOURCES.upper()}__SECTION_SOURCE__INIT_SOURCE_F_1__VAL"] = ( - "SECTION INIT_SOURCE_F_1 LEVEL" - ) + os.environ[ + f"{known_sections.SOURCES.upper()}__SECTION_SOURCE__INIT_SOURCE_F_1__VAL" + ] = "SECTION INIT_SOURCE_F_1 LEVEL" assert list(init_source_f_1()) == ["SECTION INIT_SOURCE_F_1 LEVEL"] - os.environ[f"{known_sections.SOURCES.upper()}__SECTION_SOURCE__INIT_RESOURCE_F_2__VAL"] = ( - "SECTION INIT_RESOURCE_F_2 LEVEL" - ) + os.environ[ + f"{known_sections.SOURCES.upper()}__SECTION_SOURCE__INIT_RESOURCE_F_2__VAL" + ] = "SECTION INIT_RESOURCE_F_2 LEVEL" assert list(init_resource_f_2()) == ["SECTION INIT_RESOURCE_F_2 LEVEL"] - os.environ[f"{known_sections.SOURCES.upper()}__NAME_OVERRIDDEN__SOURCE_F_1__VAL"] = ( - "NAME SOURCE_F_1 LEVEL" - ) + os.environ[ + f"{known_sections.SOURCES.upper()}__NAME_OVERRIDDEN__SOURCE_F_1__VAL" + ] = "NAME SOURCE_F_1 LEVEL" assert list(source_f_1()) == ["NAME SOURCE_F_1 LEVEL"] - os.environ[f"{known_sections.SOURCES.upper()}__NAME_OVERRIDDEN__RESOURCE_F_2__VAL"] = ( - "NAME RESOURCE_F_2 LEVEL" - ) + os.environ[ + f"{known_sections.SOURCES.upper()}__NAME_OVERRIDDEN__RESOURCE_F_2__VAL" + ] = "NAME RESOURCE_F_2 LEVEL" assert list(resource_f_2()) == ["NAME RESOURCE_F_2 LEVEL"] @@ -423,9 +425,9 @@ def test_resources_injected_sections() -> None: ) # standalone resources must accept the injected sections for lookups - os.environ["SOURCES__EXTERNAL_RESOURCES__SOURCE_VAL"] = ( + os.environ[ "SOURCES__EXTERNAL_RESOURCES__SOURCE_VAL" - ) + ] = "SOURCES__EXTERNAL_RESOURCES__SOURCE_VAL" os.environ["SOURCES__EXTERNAL_RESOURCES__VAL"] = "SOURCES__EXTERNAL_RESOURCES__VAL" os.environ["SOURCES__SECTION_SOURCE__VAL"] = "SOURCES__SECTION_SOURCE__VAL" os.environ["SOURCES__NAME_OVERRIDDEN__VAL"] = "SOURCES__NAME_OVERRIDDEN__VAL" @@ -472,12 +474,12 @@ def test_resources_injected_sections() -> None: ] # now with environ values that specify source/resource name: the module of the source, the name of the resource - os.environ["SOURCES__EXTERNAL_RESOURCES__INIT_RESOURCE_F_2__VAL"] = ( + os.environ[ "SOURCES__EXTERNAL_RESOURCES__INIT_RESOURCE_F_2__VAL" - ) - os.environ["SOURCES__EXTERNAL_RESOURCES__RESOURCE_F_2__VAL"] = ( + ] = "SOURCES__EXTERNAL_RESOURCES__INIT_RESOURCE_F_2__VAL" + os.environ[ "SOURCES__EXTERNAL_RESOURCES__RESOURCE_F_2__VAL" - ) + ] = "SOURCES__EXTERNAL_RESOURCES__RESOURCE_F_2__VAL" s = with_external() with inject_section( ConfigSectionContext( diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index c8c5c6d137..fd73fc3103 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -271,12 +271,12 @@ def some_data_from_config( @pytest.mark.parametrize("item_type", ALL_DATA_ITEM_FORMATS) def test_optional_incremental_from_config(item_type: TDataItemFormat) -> None: - os.environ["SOURCES__TEST_INCREMENTAL__SOME_DATA_FROM_CONFIG__CREATED_AT__CURSOR_PATH"] = ( - "created_at" - ) - os.environ["SOURCES__TEST_INCREMENTAL__SOME_DATA_FROM_CONFIG__CREATED_AT__INITIAL_VALUE"] = ( - "2022-02-03T00:00:00Z" - ) + os.environ[ + "SOURCES__TEST_INCREMENTAL__SOME_DATA_FROM_CONFIG__CREATED_AT__CURSOR_PATH" + ] = "created_at" + os.environ[ + "SOURCES__TEST_INCREMENTAL__SOME_DATA_FROM_CONFIG__CREATED_AT__INITIAL_VALUE" + ] = "2022-02-03T00:00:00Z" p = dlt.pipeline(pipeline_name=uniq_id()) p.extract(some_data_from_config(1, item_type)) @@ -320,7 +320,9 @@ def test_optional_arg_from_spec_not_passed(item_type: TDataItemFormat) -> None: @configspec class SomeDataOverrideConfiguration(BaseConfiguration): - created_at: dlt.sources.incremental = dlt.sources.incremental("created_at", initial_value="2022-02-03T00:00:00Z") # type: ignore[type-arg] + created_at: dlt.sources.incremental = dlt.sources.incremental( + "created_at", initial_value="2022-02-03T00:00:00Z" + ) # type: ignore[type-arg] # provide what to inject via spec. the spec contain the default @@ -922,7 +924,7 @@ def by_event_type(event): @dlt.resource(primary_key="id", table_name=lambda i: i["type"]) def _get_shuffled_events( - last_created_at=dlt.sources.incremental("$", last_value_func=by_event_type) + last_created_at=dlt.sources.incremental("$", last_value_func=by_event_type), ): with open( "tests/normalize/cases/github.events.load_page_1_duck.json", "r", encoding="utf-8" @@ -954,7 +956,7 @@ def test_timezone_naive_datetime() -> None: def some_data( updated_at: dlt.sources.incremental[pendulum.DateTime] = dlt.sources.incremental( "updated_at", pendulum_start_dt - ) + ), ): data = [ {"updated_at": start_dt + timedelta(hours=1)}, @@ -1046,7 +1048,7 @@ def test_end_value_with_batches(item_type: TDataItemFormat) -> None: def batched_sequence( updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( "updated_at", initial_value=1 - ) + ), ) -> Any: start = updated_at.last_value data = [{"updated_at": i} for i in range(start, start + 12)] @@ -1164,7 +1166,7 @@ def test_out_of_range_flags(item_type: TDataItemFormat) -> None: def descending( updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( "updated_at", initial_value=10 - ) + ), ) -> Any: for chunk in chunks(list(reversed(range(48))), 10): data = [{"updated_at": i} for i in chunk] @@ -1180,7 +1182,7 @@ def descending( def ascending( updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( "updated_at", initial_value=22, end_value=45 - ) + ), ) -> Any: for chunk in chunks(list(range(22, 500)), 10): data = [{"updated_at": i} for i in chunk] @@ -1196,7 +1198,7 @@ def ascending( def descending_single_item( updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( "updated_at", initial_value=10 - ) + ), ) -> Any: for i in reversed(range(14)): data = [{"updated_at": i}] @@ -1212,7 +1214,7 @@ def descending_single_item( def ascending_single_item( updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( "updated_at", initial_value=10, end_value=22 - ) + ), ) -> Any: for i in range(10, 500): data = [{"updated_at": i}] @@ -1245,14 +1247,19 @@ def test_get_incremental_value_type(item_type: TDataItemFormat) -> None: is pendulum.DateTime ) # typing has precedence - assert dlt.sources.incremental[pendulum.DateTime]("id", initial_value=1).get_incremental_value_type() is pendulum.DateTime # type: ignore[arg-type] + assert ( + dlt.sources.incremental[pendulum.DateTime]( + "id", initial_value=1 + ).get_incremental_value_type() + is pendulum.DateTime + ) # type: ignore[arg-type] # pass default value @dlt.resource def test_type( updated_at=dlt.sources.incremental[str]( # noqa: B008 "updated_at", allow_external_schedulers=True - ) + ), ): data = [{"updated_at": d} for d in [1, 2, 3]] yield data_to_item_format(item_type, data) @@ -1266,7 +1273,7 @@ def test_type( def test_type_2( updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( "updated_at", allow_external_schedulers=True - ) + ), ): data = [{"updated_at": d} for d in [1, 2, 3]] yield data_to_item_format(item_type, data) @@ -1288,7 +1295,7 @@ def test_type_3(updated_at: dlt.sources.incremental[int]): # pass explicit value overriding default that is typed @dlt.resource def test_type_4( - updated_at=dlt.sources.incremental("updated_at", allow_external_schedulers=True) + updated_at=dlt.sources.incremental("updated_at", allow_external_schedulers=True), ): data = [{"updated_at": d} for d in [1, 2, 3]] yield data_to_item_format(item_type, data) @@ -1300,7 +1307,7 @@ def test_type_4( # no generic type information @dlt.resource def test_type_5( - updated_at=dlt.sources.incremental("updated_at", allow_external_schedulers=True) + updated_at=dlt.sources.incremental("updated_at", allow_external_schedulers=True), ): data = [{"updated_at": d} for d in [1, 2, 3]] yield data_to_item_format(item_type, data) @@ -1316,7 +1323,7 @@ def test_join_env_scheduler(item_type: TDataItemFormat) -> None: def test_type_2( updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( "updated_at", allow_external_schedulers=True - ) + ), ): data = [{"updated_at": d} for d in [1, 2, 3]] yield data_to_item_format(item_type, data) @@ -1343,7 +1350,7 @@ def test_join_env_scheduler_pipeline(item_type: TDataItemFormat) -> None: def test_type_2( updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( "updated_at", allow_external_schedulers=True - ) + ), ): data = [{"updated_at": d} for d in [1, 2, 3]] yield data_to_item_format(item_type, data) diff --git a/tests/helpers/airflow_tests/test_join_airflow_scheduler.py b/tests/helpers/airflow_tests/test_join_airflow_scheduler.py index 8c1992c506..aedf651b7b 100644 --- a/tests/helpers/airflow_tests/test_join_airflow_scheduler.py +++ b/tests/helpers/airflow_tests/test_join_airflow_scheduler.py @@ -30,7 +30,7 @@ def existing_incremental( updated_at: dlt.sources.incremental[pendulum.DateTime] = dlt.sources.incremental( "updated_at", allow_external_schedulers=True - ) + ), ): yield {"updated_at": CATCHUP_BEGIN, "state": updated_at.get_state()} @@ -73,7 +73,7 @@ def scheduled() -> None: def incremental_datetime( updated_at=dlt.sources.incremental[datetime.datetime]( "updated_at", allow_external_schedulers=True - ) + ), ): yield {"updated_at": CATCHUP_BEGIN, "state": updated_at.get_state()} @@ -90,7 +90,7 @@ def incremental_datetime( def incremental_datetime( updated_at=dlt.sources.incremental[datetime.date]( "updated_at", allow_external_schedulers=True - ) + ), ): yield { "updated_at": ensure_pendulum_date(CATCHUP_BEGIN), @@ -109,7 +109,7 @@ def incremental_datetime( def incremental_datetime( updated_at=dlt.sources.incremental[int]( "updated_at", allow_external_schedulers=True - ) + ), ): yield {"updated_at": CATCHUP_BEGIN.int_timestamp, "state": updated_at.get_state()} @@ -125,7 +125,7 @@ def incremental_datetime( def incremental_datetime( updated_at=dlt.sources.incremental[float]( "updated_at", allow_external_schedulers=True - ) + ), ): yield {"updated_at": CATCHUP_BEGIN.timestamp(), "state": updated_at.get_state()} @@ -139,7 +139,7 @@ def incremental_datetime( def incremental_datetime( updated_at=dlt.sources.incremental[str]( "updated_at", allow_external_schedulers=True - ) + ), ): yield { "updated_at": CATCHUP_BEGIN.in_tz("UTC").isoformat(), @@ -185,7 +185,7 @@ def unscheduled(): def incremental_datetime( updated_at=dlt.sources.incremental[datetime.datetime]( "updated_at", allow_external_schedulers=True - ) + ), ): yield { "updated_at": context["data_interval_start"], @@ -209,7 +209,7 @@ def incremental_datetime( def incremental_datetime( updated_at=dlt.sources.incremental[datetime.datetime]( "updated_at", allow_external_schedulers=True - ) + ), ): yield { "updated_at": now.subtract(hours=1, seconds=1), @@ -249,7 +249,7 @@ def scheduled(): def incremental_datetime( updated_at=dlt.sources.incremental[datetime.datetime]( "updated_at", allow_external_schedulers=True - ) + ), ): yield { "updated_at": context["data_interval_start"], diff --git a/tests/libs/test_parquet_writer.py b/tests/libs/test_parquet_writer.py index b1c19114fe..2673e8deef 100644 --- a/tests/libs/test_parquet_writer.py +++ b/tests/libs/test_parquet_writer.py @@ -133,10 +133,14 @@ def test_parquet_writer_all_data_fields() -> None: writer.write_data_item([data], TABLE_UPDATE_COLUMNS_SCHEMA) # We want to test precision for these fields is trimmed to millisecond - data["col4_precision"] = data["col4_precision"].replace( # type: ignore[attr-defined] + data["col4_precision"] = data[ + "col4_precision" + ].replace( # type: ignore[attr-defined] microsecond=int(str(data["col4_precision"].microsecond)[:3] + "000") # type: ignore[attr-defined] ) - data["col11_precision"] = data["col11_precision"].replace( # type: ignore[attr-defined] + data["col11_precision"] = data[ + "col11_precision" + ].replace( # type: ignore[attr-defined] microsecond=int(str(data["col11_precision"].microsecond)[:3] + "000") # type: ignore[attr-defined] ) diff --git a/tests/libs/test_pydantic.py b/tests/libs/test_pydantic.py index b7ca44c595..447f29a0c5 100644 --- a/tests/libs/test_pydantic.py +++ b/tests/libs/test_pydantic.py @@ -261,9 +261,14 @@ class User(BaseModel): # extra is modified assert model_freeze.__fields__["address"].annotation.__name__ == "UserAddressExtraAllow" # type: ignore[index] # annotated is preserved - assert issubclass(get_origin(model_freeze.__fields__["address"].rebuild_annotation()), Annotated) # type: ignore[arg-type, index] + assert issubclass( + get_origin(model_freeze.__fields__["address"].rebuild_annotation()), Annotated + ) # type: ignore[arg-type, index] # UserAddress is converted to UserAddressAllow only once - assert model_freeze.__fields__["address"].annotation is get_args(model_freeze.__fields__["unity"].annotation)[0] # type: ignore[index] + assert ( + model_freeze.__fields__["address"].annotation + is get_args(model_freeze.__fields__["unity"].annotation)[0] + ) # type: ignore[index] # print(User.__fields__) # print(User.__fields__["name"].annotation) diff --git a/tests/load/bigquery/test_bigquery_client.py b/tests/load/bigquery/test_bigquery_client.py index 2e7930c339..479249bdd7 100644 --- a/tests/load/bigquery/test_bigquery_client.py +++ b/tests/load/bigquery/test_bigquery_client.py @@ -362,8 +362,7 @@ def test_loading_errors(client: BigQueryClient, file_storage: FileStorage) -> No assert ( "Invalid BIGNUMERIC value:" " 578960446186580977117854925043439539266.34992332820282019728792003956564819968 Field:" - " parse_data__metadata__rasa_x_id;" - in job.exception() + " parse_data__metadata__rasa_x_id;" in job.exception() ) diff --git a/tests/load/filesystem/test_azure_credentials.py b/tests/load/filesystem/test_azure_credentials.py index 093cd6dd19..bbc37f5f22 100644 --- a/tests/load/filesystem/test_azure_credentials.py +++ b/tests/load/filesystem/test_azure_credentials.py @@ -46,9 +46,9 @@ def test_create_azure_sas_token_with_permissions(environment: Dict[str, str]) -> def test_azure_credentials_from_sas_token(environment: Dict[str, str]) -> None: environment["CREDENTIALS__AZURE_STORAGE_ACCOUNT_NAME"] = "fake_account_name" - environment["CREDENTIALS__AZURE_STORAGE_SAS_TOKEN"] = ( - "sp=rwdlacx&se=2021-01-01T00:00:00Z&sv=2019-12-12&sr=c&sig=1234567890" - ) + environment[ + "CREDENTIALS__AZURE_STORAGE_SAS_TOKEN" + ] = "sp=rwdlacx&se=2021-01-01T00:00:00Z&sv=2019-12-12&sr=c&sig=1234567890" config = resolve_configuration(AzureCredentials()) diff --git a/tests/load/pipeline/test_drop.py b/tests/load/pipeline/test_drop.py index cd18454d7c..58af9be60b 100644 --- a/tests/load/pipeline/test_drop.py +++ b/tests/load/pipeline/test_drop.py @@ -28,14 +28,14 @@ def _attach(pipeline: Pipeline) -> Pipeline: def droppable_source() -> List[DltResource]: @dlt.resource def droppable_a( - a: dlt.sources.incremental[int] = dlt.sources.incremental("a", 0) + a: dlt.sources.incremental[int] = dlt.sources.incremental("a", 0), ) -> Iterator[Dict[str, Any]]: yield dict(a=1, b=2, c=3) yield dict(a=4, b=23, c=24) @dlt.resource def droppable_b( - asd: dlt.sources.incremental[int] = dlt.sources.incremental("asd", 0) + asd: dlt.sources.incremental[int] = dlt.sources.incremental("asd", 0), ) -> Iterator[Dict[str, Any]]: # Child table yield dict(asd=2323, qe=555, items=[dict(m=1, n=2), dict(m=3, n=4)]) diff --git a/tests/load/qdrant/test_pipeline.py b/tests/load/qdrant/test_pipeline.py index c24c309ca6..2f0ccfd0c3 100644 --- a/tests/load/qdrant/test_pipeline.py +++ b/tests/load/qdrant/test_pipeline.py @@ -38,7 +38,11 @@ def some_data(): embed=["content"], ) - assert some_data.columns["content"] == {"name": "content", "data_type": "text", "x-qdrant-embed": True} # type: ignore[index] + assert some_data.columns["content"] == { + "name": "content", + "data_type": "text", + "x-qdrant-embed": True, + } # type: ignore[index] def test_basic_state_and_schema() -> None: diff --git a/tests/load/weaviate/test_pipeline.py b/tests/load/weaviate/test_pipeline.py index a4b5098fe7..f072d809b5 100644 --- a/tests/load/weaviate/test_pipeline.py +++ b/tests/load/weaviate/test_pipeline.py @@ -43,7 +43,11 @@ def some_data(): some_data, vectorize=["content"], ) - assert some_data.columns["content"] == {"name": "content", "data_type": "text", "x-weaviate-vectorize": True} # type: ignore[index] + assert some_data.columns["content"] == { + "name": "content", + "data_type": "text", + "x-weaviate-vectorize": True, + } # type: ignore[index] def test_basic_state_and_schema() -> None: diff --git a/tests/pipeline/test_pipeline_state.py b/tests/pipeline/test_pipeline_state.py index ee788367e1..c006c907b0 100644 --- a/tests/pipeline/test_pipeline_state.py +++ b/tests/pipeline/test_pipeline_state.py @@ -531,7 +531,10 @@ def test_migrate_state(test_storage: FileStorage) -> None: "_state_engine_version": 3, } migrate_state( - "test_pipeline", state_v3, state_v3["_state_engine_version"], STATE_ENGINE_VERSION # type: ignore + "test_pipeline", + state_v3, + state_v3["_state_engine_version"], + STATE_ENGINE_VERSION, # type: ignore ) assert state_v3["destination_name"] == "redshift" assert state_v3["destination_type"] == "dlt.destinations.redshift" @@ -545,7 +548,10 @@ def test_migrate_state(test_storage: FileStorage) -> None: "_state_engine_version": 3, } migrate_state( - "test_pipeline", state_v3, state_v3["_state_engine_version"], STATE_ENGINE_VERSION # type: ignore + "test_pipeline", + state_v3, + state_v3["_state_engine_version"], + STATE_ENGINE_VERSION, # type: ignore ) assert state_v3["destination_name"] == "redshift" assert state_v3["destination_type"] == "dlt.destinations.redshift" @@ -555,7 +561,10 @@ def test_migrate_state(test_storage: FileStorage) -> None: state_v3 = {"destination": None, "staging": None, "_state_engine_version": 3} migrate_state( - "test_pipeline", state_v3, state_v3["_state_engine_version"], STATE_ENGINE_VERSION # type: ignore + "test_pipeline", + state_v3, + state_v3["_state_engine_version"], + STATE_ENGINE_VERSION, # type: ignore ) assert "destination_name" not in state_v3 assert "destination_type" not in state_v3 @@ -564,7 +573,10 @@ def test_migrate_state(test_storage: FileStorage) -> None: state_v3 = {"_state_engine_version": 2} migrate_state( - "test_pipeline", state_v3, state_v3["_state_engine_version"], STATE_ENGINE_VERSION # type: ignore + "test_pipeline", + state_v3, + state_v3["_state_engine_version"], + STATE_ENGINE_VERSION, # type: ignore ) assert "destination_name" not in state_v3 assert "destination_type" not in state_v3 diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index cec578cb7b..de9e4527cb 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -450,7 +450,14 @@ def test_slack_hook(environment: DictStrStr) -> None: with requests_mock.mock() as m: m.post(hook_url, json={}) load_info = dlt.pipeline().run([1, 2, 3], table_name="data", destination="dummy") - assert slack_notify_load_success(load_info.pipeline.runtime_config.slack_incoming_hook, load_info, load_info.pipeline.last_trace) == 200 # type: ignore[attr-defined] + assert ( + slack_notify_load_success( + load_info.pipeline.runtime_config.slack_incoming_hook, + load_info, + load_info.pipeline.last_trace, + ) + == 200 + ) # type: ignore[attr-defined] assert m.called message = m.last_request.json() assert "rudolfix" in message["text"] @@ -462,7 +469,14 @@ def test_broken_slack_hook(environment: DictStrStr) -> None: environment["RUNTIME__SLACK_INCOMING_HOOK"] = "http://localhost:22" load_info = dlt.pipeline().run([1, 2, 3], table_name="data", destination="dummy") # connection error - assert slack_notify_load_success(load_info.pipeline.runtime_config.slack_incoming_hook, load_info, load_info.pipeline.last_trace) == -1 # type: ignore[attr-defined] + assert ( + slack_notify_load_success( + load_info.pipeline.runtime_config.slack_incoming_hook, + load_info, + load_info.pipeline.last_trace, + ) + == -1 + ) # type: ignore[attr-defined] # pipeline = dlt.pipeline() # assert pipeline.last_trace is not None # assert pipeline._trace is None diff --git a/tests/pipeline/test_schema_contracts.py b/tests/pipeline/test_schema_contracts.py index 2f2e6b6932..a80ff1030a 100644 --- a/tests/pipeline/test_schema_contracts.py +++ b/tests/pipeline/test_schema_contracts.py @@ -606,7 +606,9 @@ def dynamic_columns(item): if item["id"] == 2: return [{"name": "id", "data_type": "bigint", "nullable": True}] - @dlt.resource(name="items", table_name=lambda i: "items", schema_contract={"columns": column_mode}) # type: ignore + @dlt.resource( + name="items", table_name=lambda i: "items", schema_contract={"columns": column_mode} + ) # type: ignore def get_items(): yield { "id": 1, diff --git a/tests/tools/clean_redshift.py b/tests/tools/clean_redshift.py index f81407f74a..3db3abf67b 100644 --- a/tests/tools/clean_redshift.py +++ b/tests/tools/clean_redshift.py @@ -11,12 +11,14 @@ # list all schemas with connection.cursor() as curr: - curr.execute("""select s.nspname as table_schema, + curr.execute( + """select s.nspname as table_schema, s.oid as schema_id, u.usename as owner from pg_catalog.pg_namespace s join pg_catalog.pg_user u on u.usesysid = s.nspowner - order by table_schema;""") + order by table_schema;""" + ) schemas = [row[0] for row in curr.fetchall()] # delete all schemas, skipp expected errors