Skip to content

Commit

Permalink
Rfix/load package extract (#790)
Browse files Browse the repository at this point in the history
* implements package storage

* removes deprecated to_service_account_credentials

* allows to reopen buffered writer

* WIP extract state with other resources

* adds version hash to state

* adds load id to extract step

* adds typing based deprecations

* adds high precision timer for windows

* adds listing of packages and tests
  • Loading branch information
rudolfix authored Nov 29, 2023
1 parent cfb6e66 commit 1f94a3b
Show file tree
Hide file tree
Showing 57 changed files with 5,973 additions and 4,944 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ dev: has-poetry

lint:
./check-package.sh
poetry run black ./ --diff --exclude=".*syntax_error.py|\.venv.*|_storage/.*"
# poetry run isort ./ --diff
poetry run mypy --config-file mypy.ini dlt tests
poetry run flake8 --max-line-length=200 dlt
poetry run flake8 --max-line-length=200 tests --exclude tests/reflection/module_cases
poetry run black dlt docs tests --diff --exclude=".*syntax_error.py|\.venv.*|_storage/.*"
# poetry run isort ./ --diff
# $(MAKE) lint-security

format:
poetry run black ./ --exclude=".*syntax_error.py|\.venv.*|_storage/.*"
poetry run black dlt docs tests --exclude=".*syntax_error.py|\.venv.*|_storage/.*"
# poetry run isort ./

test-and-lint-snippets:
Expand Down
26 changes: 15 additions & 11 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dlt.common.runners import Venv
from dlt.common.runners.stdout import iter_stdout
from dlt.common.schema.utils import group_tables_by_resource, remove_defaults
from dlt.common.storages import FileStorage, LoadStorage
from dlt.common.storages import FileStorage, PackageStorage
from dlt.pipeline.helpers import DropCommand
from dlt.pipeline.exceptions import CannotRestorePipelineException

Expand Down Expand Up @@ -72,28 +72,30 @@ def pipeline_command(
return # No need to sync again

def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
extracted_files = p.list_extracted_resources()
if extracted_files:
extracted_packages = p.list_extracted_load_packages()
if extracted_packages:
fmt.echo(
"Has %s extracted files ready to be normalized"
% fmt.bold(str(len(extracted_files)))
"Has %s extracted packages ready to be normalized with following load ids:"
% fmt.bold(str(len(extracted_packages)))
)
for load_id in extracted_packages:
fmt.echo(load_id)
norm_packages = p.list_normalized_load_packages()
if norm_packages:
fmt.echo(
"Has %s load packages ready to be loaded with following load ids:"
"Has %s normalized packages ready to be loaded with following load ids:"
% fmt.bold(str(len(norm_packages)))
)
for load_id in norm_packages:
fmt.echo(load_id)
# load first (oldest) package
first_package_info = p.get_load_package_info(norm_packages[0])
if LoadStorage.is_package_partially_loaded(first_package_info):
if PackageStorage.is_package_partially_loaded(first_package_info):
fmt.warning(
"This package is partially loaded. Data in the destination may be modified."
)
fmt.echo()
return extracted_files, norm_packages
return extracted_packages, norm_packages

fmt.echo("Found pipeline %s in %s" % (fmt.bold(p.pipeline_name), fmt.bold(p.pipelines_dir)))

Expand Down Expand Up @@ -209,8 +211,8 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
fmt.echo("No failed jobs found")

if operation == "drop-pending-packages":
extracted_files, norm_packages = _display_pending_packages()
if len(extracted_files) == 0 and len(norm_packages) == 0:
extracted_packages, norm_packages = _display_pending_packages()
if len(extracted_packages) == 0 and len(norm_packages) == 0:
fmt.echo("No pending packages found")
if fmt.confirm("Delete the above packages?", default=False):
p.drop_pending_packages(with_partial_loads=True)
Expand All @@ -230,7 +232,9 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
if operation == "load-package":
load_id = command_kwargs.get("load_id")
if not load_id:
packages = sorted(p.list_normalized_load_packages())
packages = sorted(p.list_extracted_load_packages())
if not packages:
packages = sorted(p.list_normalized_load_packages())
if not packages:
packages = sorted(p.list_completed_load_packages())
if not packages:
Expand Down
6 changes: 0 additions & 6 deletions dlt/common/configuration/specs/gcp_credentials.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import sys
from typing import Any, Final, List, Tuple, Union
from deprecated import deprecated

from dlt.common import json, pendulum
from dlt.common.configuration.specs.api_credentials import OAuth2Credentials
Expand Down Expand Up @@ -89,13 +88,8 @@ def on_resolved(self) -> None:
# must end with new line, otherwise won't be parsed by Crypto
self.private_key = TSecretValue(self.private_key + "\n")

@deprecated(reason="Use 'to_native_credentials' method instead")
def to_service_account_credentials(self) -> Any:
return self.to_native_credentials()

def to_native_credentials(self) -> Any:
"""Returns google.oauth2.service_account.Credentials"""

from google.oauth2.service_account import Credentials as ServiceAccountCredentials

if isinstance(self.private_key, ServiceAccountCredentials):
Expand Down
6 changes: 4 additions & 2 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import gzip
from functools import reduce
from typing import List, IO, Any, Optional, Type, TypeVar, Generic

from dlt.common.utils import uniq_id
Expand Down Expand Up @@ -75,7 +74,9 @@ def __init__(
raise InvalidFileNameTemplateException(file_name_template)

def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> int:
self._ensure_open()
if self._closed:
self._rotate_file()
self._closed = False
# rotate file if columns changed and writer does not allow for that
# as the only allowed change is to add new column (no updates/deletes), we detect the change by comparing lengths
if (
Expand Down Expand Up @@ -175,6 +176,7 @@ def _flush_and_close_file(self) -> None:
self.closed_files.append(self._file_name)
self._writer = None
self._file = None
self._file_name = None

def _ensure_open(self) -> None:
if self._closed:
Expand Down
5 changes: 4 additions & 1 deletion dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ class TPipelineLocalState(TypedDict, total=False):
first_run: bool
"""Indicates a first run of the pipeline, where run ends with successful loading of data"""
_last_extracted_at: datetime.datetime
"""Timestamp indicating when the state was synced with the destination. Lack of timestamp means not synced state."""
"""Timestamp indicating when the state was synced with the destination."""
_last_extracted_hash: str
"""Hash of state that was recently synced with destination"""


class TPipelineState(TypedDict, total=False):
Expand All @@ -193,6 +195,7 @@ class TPipelineState(TypedDict, total=False):

# properties starting with _ are not automatically applied to pipeline object when state is restored
_state_version: int
_version_hash: str
_state_engine_version: int
_local: TPipelineLocalState
"""A section of state that is not synchronized with the destination and does not participate in change merging and version control"""
Expand Down
12 changes: 9 additions & 3 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ def version_hash(self) -> str:
return utils.bump_version_if_modified(self.to_dict())[1]

@property
def previous_hashes(self) -> List[str]:
def previous_hashes(self) -> Sequence[str]:
"""Current version hash of the schema, recomputed from the actual content"""
return utils.bump_version_if_modified(self.to_dict())[3]

Expand Down Expand Up @@ -618,9 +618,15 @@ def to_pretty_yaml(self, remove_defaults: bool = True) -> str:
d = self.to_dict(remove_defaults=remove_defaults)
return yaml.dump(d, allow_unicode=True, default_flow_style=False, sort_keys=False)

def clone(self, update_normalizers: bool = False) -> "Schema":
"""Make a deep copy of the schema, possibly updating normalizers and identifiers in the schema if `update_normalizers` is True"""
def clone(self, with_name: str = None, update_normalizers: bool = False) -> "Schema":
"""Make a deep copy of the schema, optionally changing the name, and updating normalizers and identifiers in the schema if `update_normalizers` is True
Note that changing of name will break the previous version chain
"""
d = deepcopy(self.to_dict())
if with_name is not None:
d["name"] = with_name
d["previous_hashes"] = []
schema = Schema.from_dict(d) # type: ignore
# update normalizers and possibly all schema identifiers
if update_normalizers:
Expand Down
17 changes: 8 additions & 9 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

from dlt.common import json
from dlt.common.data_types import TDataType
from dlt.common.exceptions import DictValidationException, MissingDependencyException
from dlt.common.exceptions import DictValidationException
from dlt.common.normalizers import explicit_normalizers
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCase
from dlt.common.typing import DictStrAny, REPattern, is_dict_generic_type
from dlt.common.validation import TCustomValidator, validate_dict, validate_dict_ignoring_xkeys
from dlt.common.typing import DictStrAny, REPattern
from dlt.common.validation import TCustomValidator, validate_dict_ignoring_xkeys
from dlt.common.schema import detections
from dlt.common.schema.typing import (
COLUMN_HINTS,
Expand All @@ -37,7 +37,6 @@
TTypeDetections,
TWriteDisposition,
TSchemaContract,
TSchemaContractDict,
)
from dlt.common.schema.exceptions import (
CannotCoerceColumnException,
Expand Down Expand Up @@ -168,8 +167,8 @@ def add_column_defaults(column: TColumnSchemaBase) -> TColumnSchema:
# return copy(column) # type: ignore


def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, str, List[str]]:
"""Bumps the `stored_schema` version and version hash if content modified, returns (new version, new hash, old hash) tuple"""
def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, str, Sequence[str]]:
"""Bumps the `stored_schema` version and version hash if content modified, returns (new version, new hash, old hash, 10 last hashes) tuple"""
hash_ = generate_version_hash(stored_schema)
previous_hash = stored_schema.get("version_hash")
if not previous_hash:
Expand All @@ -188,14 +187,14 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, st

def generate_version_hash(stored_schema: TStoredSchema) -> str:
# generates hash out of stored schema content, excluding the hash itself and version
schema_copy = deepcopy(stored_schema)
schema_copy = copy(stored_schema)
schema_copy.pop("version")
schema_copy.pop("version_hash", None)
schema_copy.pop("imported_version_hash", None)
schema_copy.pop("previous_hashes", None)
# ignore order of elements when computing the hash
content = json.dumps(schema_copy, sort_keys=True)
h = hashlib.sha3_256(content.encode("utf-8"))
content = json.dumpb(schema_copy, sort_keys=True)
h = hashlib.sha3_256(content)
# additionally check column order
table_names = sorted((schema_copy.get("tables") or {}).keys())
if table_names:
Expand Down
14 changes: 13 additions & 1 deletion dlt/common/storages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@
from .schema_storage import SchemaStorage
from .live_schema_storage import LiveSchemaStorage
from .normalize_storage import NormalizeStorage
from .load_storage import LoadStorage
from .load_package import (
ParsedLoadJobFileName,
LoadJobInfo,
LoadPackageInfo,
PackageStorage,
TJobState,
)
from .data_item_storage import DataItemStorage
from .load_storage import LoadStorage
from .configuration import (
LoadStorageConfiguration,
NormalizeStorageConfiguration,
Expand All @@ -28,6 +35,11 @@
"SchemaStorageConfiguration",
"TSchemaFileFormat",
"FilesystemConfiguration",
"ParsedLoadJobFileName",
"LoadJobInfo",
"LoadPackageInfo",
"PackageStorage",
"TJobState",
"fsspec_from_config",
"fsspec_filesystem",
]
32 changes: 27 additions & 5 deletions dlt/common/storages/data_item_storage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Dict, Any, List, Generic
from pathlib import Path
from typing import Dict, Any, List, Sequence
from abc import ABC, abstractmethod

from dlt.common import logger
from dlt.common.schema import TTableSchemaColumns
from dlt.common.typing import TDataItems
from dlt.common.typing import StrAny, TDataItems
from dlt.common.data_writers import TLoaderFileFormat, BufferedDataWriter, DataWriter


Expand Down Expand Up @@ -44,10 +45,10 @@ def write_empty_file(
writer = self.get_writer(load_id, schema_name, table_name)
writer.write_empty_file(columns)

def close_writers(self, extract_id: str) -> None:
def close_writers(self, load_id: str) -> None:
# flush and close all files
for name, writer in self.buffered_writers.items():
if name.startswith(extract_id):
if name.startswith(load_id) and not writer.closed:
logger.debug(
f"Closing writer for {name} with file {writer._file} and actual name"
f" {writer._file_name}"
Expand All @@ -61,7 +62,28 @@ def closed_files(self) -> List[str]:

return files

def _write_temp_job_file(
self,
load_id: str,
table_name: str,
table: TTableSchemaColumns,
file_id: str,
rows: Sequence[StrAny],
) -> str:
"""Writes new file into new packages "new_jobs". Intended for testing"""
file_name = (
self._get_data_item_path_template(load_id, None, table_name) % file_id
+ "."
+ self.loader_file_format
)
format_spec = DataWriter.data_format_from_file_format(self.loader_file_format)
mode = "wb" if format_spec.is_binary_format else "w"
with self.storage.open_file(file_name, mode=mode) as f: # type: ignore[attr-defined]
writer = DataWriter.from_file_format(self.loader_file_format, f)
writer.write_all(table, rows)
return Path(file_name).name

@abstractmethod
def _get_data_item_path_template(self, load_id: str, schema_name: str, table_name: str) -> str:
# note: use %s for file id to create required template format
"""Returns a file template for item writer. note: use %s for file id to create required template format"""
pass
20 changes: 18 additions & 2 deletions dlt/common/storages/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import semver
from typing import Iterable

from dlt.common.exceptions import DltException
from dlt.common.exceptions import DltException, TerminalValueError
from dlt.common.data_writers import TLoaderFileFormat


Expand Down Expand Up @@ -43,11 +43,27 @@ def __init__(
)


class StorageMigrationError(StorageException):
def __init__(
self,
storage_path: str,
from_version: semver.VersionInfo,
target_version: semver.VersionInfo,
info: str,
) -> None:
self.storage_path = storage_path
self.from_version = from_version
self.target_version = target_version
super().__init__(
f"Storage {storage_path} with target v {target_version} at {from_version}: " + info
)


class LoadStorageException(StorageException):
pass


class JobWithUnsupportedWriterException(LoadStorageException):
class JobWithUnsupportedWriterException(LoadStorageException, TerminalValueError):
def __init__(
self, load_id: str, expected_file_formats: Iterable[TLoaderFileFormat], wrong_job: str
) -> None:
Expand Down
6 changes: 5 additions & 1 deletion dlt/common/storages/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,11 @@ def get_file_name_from_file_path(file_path: str) -> str:
@staticmethod
def validate_file_name_component(name: str) -> None:
# Universal platform bans several characters allowed in POSIX ie. | < \ or "COM1" :)
pathvalidate.validate_filename(name, platform="Universal")
try:
pathvalidate.validate_filename(name, platform="Universal")
except pathvalidate.error.ValidationError as val_ex:
if val_ex.reason != pathvalidate.ErrorReason.INVALID_LENGTH:
raise
# component cannot contain "."
if FILE_COMPONENT_INVALID_CHARACTERS.search(name):
raise pathvalidate.error.InvalidCharError(
Expand Down
Loading

0 comments on commit 1f94a3b

Please sign in to comment.