Skip to content

Commit

Permalink
adds method to drop pending packages + cli
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Nov 18, 2023
1 parent 4bde8d0 commit 5207dc3
Show file tree
Hide file tree
Showing 16 changed files with 205 additions and 496 deletions.
40 changes: 28 additions & 12 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import yaml
from typing import Any
from typing import Any, Sequence, Tuple
import dlt
from dlt.cli.exceptions import CliCommandException

Expand All @@ -9,8 +9,7 @@
from dlt.common.runners import Venv
from dlt.common.runners.stdout import iter_stdout
from dlt.common.schema.utils import group_tables_by_resource, remove_defaults
from dlt.common.storages.file_storage import FileStorage
from dlt.common.typing import DictStrAny
from dlt.common.storages import FileStorage, LoadStorage
from dlt.pipeline.helpers import DropCommand
from dlt.pipeline.exceptions import CannotRestorePipelineException

Expand All @@ -33,6 +32,8 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
return

try:
if verbosity > 0:
fmt.echo("Attaching to pipeline %s" % fmt.bold(pipeline_name))
p = dlt.attach(pipeline_name=pipeline_name, pipelines_dir=pipelines_dir)
except CannotRestorePipelineException as e:
if operation not in {"sync", "drop"}:
Expand All @@ -52,6 +53,22 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
if operation == "sync":
return # No need to sync again

def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
extracted_files = p.list_extracted_resources()
if extracted_files:
fmt.echo("Has %s extracted files ready to be normalized" % fmt.bold(str(len(extracted_files))))
norm_packages = p.list_normalized_load_packages()
if norm_packages:
fmt.echo("Has %s load packages ready to be loaded with following load ids:" % fmt.bold(str(len(norm_packages))))
for load_id in norm_packages:
fmt.echo(load_id)
# load first (oldest) package
first_package_info = p.get_load_package_info(norm_packages[0])
if LoadStorage.is_package_partially_loaded(first_package_info):
fmt.warning("This package is partially loaded. Data in the destination may be modified.")
fmt.echo()
return extracted_files, norm_packages

fmt.echo("Found pipeline %s in %s" % (fmt.bold(p.pipeline_name), fmt.bold(p.pipelines_dir)))

if operation == "show":
Expand Down Expand Up @@ -102,15 +119,7 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
fmt.echo("%s with %s table(s) and %s resource state slot(s)" % (fmt.bold(resource_name), fmt.bold(str(len(tables))), fmt.bold(str(res_state_slots))))
fmt.echo()
fmt.echo("Working dir content:")
extracted_files = p.list_extracted_resources()
if extracted_files:
fmt.echo("Has %s extracted files ready to be normalized" % fmt.bold(str(len(extracted_files))))
norm_packages = p.list_normalized_load_packages()
if norm_packages:
fmt.echo("Has %s load packages ready to be loaded with following load ids:" % fmt.bold(str(len(norm_packages))))
for load_id in norm_packages:
fmt.echo(load_id)
fmt.echo()
_display_pending_packages()
loaded_packages = p.list_completed_load_packages()
if loaded_packages:
fmt.echo("Has %s completed load packages with following load ids:" % fmt.bold(str(len(loaded_packages))))
Expand Down Expand Up @@ -148,6 +157,13 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
else:
fmt.echo("No failed jobs found")

if operation == "drop-pending-packages":
extracted_files, norm_packages = _display_pending_packages()
if len(extracted_files) == 0 and len(norm_packages) == 0:
fmt.echo("No pending packages found")
if fmt.confirm("Delete the above packages?", default=False):
p.drop_pending_packages(with_partial_loads=True)
fmt.echo("Pending packages deleted")

if operation == "sync":
if fmt.confirm("About to drop the local state of the pipeline and reset all the schemas. The destination state, data and schemas are left intact. Proceed?", default=False):
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/storages/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def has_folder(self, relative_path: str) -> bool:
return os.path.isdir(self.make_full_path(relative_path))

def list_folder_files(self, relative_path: str, to_root: bool = True) -> List[str]:
"""List all files in ``relative_path`` folder
"""List all files in `relative_path` folder
Args:
relative_path (str): A path to folder, relative to storage root
Expand Down
35 changes: 23 additions & 12 deletions dlt/common/storages/load_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

# folders to manage load jobs in a single load package
TJobState = Literal["new_jobs", "failed_jobs", "started_jobs", "completed_jobs"]
WORKING_FOLDERS = set(get_args(TJobState))
WORKING_FOLDERS: Set[TJobState] = set(get_args(TJobState))
TLoadPackageState = Literal["normalized", "loaded", "aborted"]


Expand Down Expand Up @@ -193,7 +193,7 @@ def write_temp_job_file(self, load_id: str, table_name: str, table: TTableSchema

def load_package_schema(self, load_id: str) -> Schema:
# load schema from a load package to be processed
schema_path = join(self.get_package_path(load_id), LoadStorage.SCHEMA_FILE_NAME)
schema_path = join(self.get_normalized_package_path(load_id), LoadStorage.SCHEMA_FILE_NAME)
return self._load_schema(schema_path)

def load_temp_schema(self, load_id: str) -> Schema:
Expand All @@ -211,14 +211,16 @@ def save_temp_schema_updates(self, load_id: str, schema_update: TSchemaTables) -
json.dump(schema_update, f)

def commit_temp_load_package(self, load_id: str) -> None:
self.storage.rename_tree(load_id, self.get_package_path(load_id))
self.storage.rename_tree(load_id, self.get_normalized_package_path(load_id))

def list_packages(self) -> Sequence[str]:
def list_normalized_packages(self) -> Sequence[str]:
"""Lists all packages that are normalized and will be loaded or are currently loaded"""
loads = self.storage.list_folder_dirs(LoadStorage.NORMALIZED_FOLDER, to_root=False)
# start from the oldest packages
return sorted(loads)

def list_completed_packages(self) -> Sequence[str]:
"""List packages that are completely loaded"""
loads = self.storage.list_folder_dirs(LoadStorage.LOADED_FOLDER, to_root=False)
# start from the oldest packages
return sorted(loads)
Expand Down Expand Up @@ -264,7 +266,7 @@ def get_load_package_info(self, load_id: str) -> LoadPackageInfo:
# check if package is completed or in process
package_created_at: DateTime = None
package_state: TLoadPackageState = "normalized"
package_path = self.get_package_path(load_id)
package_path = self.get_normalized_package_path(load_id)
applied_update: TSchemaTables = {}
if not self.storage.has_folder(package_path):
package_path = self.get_completed_package_path(load_id)
Expand All @@ -291,7 +293,7 @@ def get_load_package_info(self, load_id: str) -> LoadPackageInfo:
return LoadPackageInfo(load_id, self.storage.make_full_path(package_path), package_state, schema.name, applied_update, package_created_at, all_jobs)

def begin_schema_update(self, load_id: str) -> Optional[TSchemaTables]:
package_path = self.get_package_path(load_id)
package_path = self.get_normalized_package_path(load_id)
if not self.storage.has_folder(package_path):
raise FileNotFoundError(package_path)
schema_update_file = join(package_path, LoadStorage.SCHEMA_UPDATES_FILE_NAME)
Expand All @@ -303,7 +305,7 @@ def begin_schema_update(self, load_id: str) -> Optional[TSchemaTables]:

def commit_schema_update(self, load_id: str, applied_update: TSchemaTables) -> None:
"""Marks schema update as processed and stores the update that was applied at the destination"""
load_path = self.get_package_path(load_id)
load_path = self.get_normalized_package_path(load_id)
schema_update_file = join(load_path, LoadStorage.SCHEMA_UPDATES_FILE_NAME)
processed_schema_update_file = join(load_path, LoadStorage.APPLIED_SCHEMA_UPDATES_FILE_NAME)
# delete initial schema update
Expand Down Expand Up @@ -344,7 +346,7 @@ def complete_job(self, load_id: str, file_name: str) -> str:
return self._move_job(load_id, LoadStorage.STARTED_JOBS_FOLDER, LoadStorage.COMPLETED_JOBS_FOLDER, file_name)

def complete_load_package(self, load_id: str, aborted: bool) -> None:
load_path = self.get_package_path(load_id)
load_path = self.get_normalized_package_path(load_id)
has_failed_jobs = len(self.list_failed_jobs(load_id)) > 0
# delete completed jobs
if self.config.delete_completed_jobs and not has_failed_jobs:
Expand All @@ -367,7 +369,7 @@ def delete_completed_package(self, load_id: str) -> None:
def wipe_normalized_packages(self) -> None:
self.storage.delete_folder(self.NORMALIZED_FOLDER, recursively=True)

def get_package_path(self, load_id: str) -> str:
def get_normalized_package_path(self, load_id: str) -> str:
return join(LoadStorage.NORMALIZED_FOLDER, load_id)

def get_completed_package_path(self, load_id: str) -> str:
Expand All @@ -378,7 +380,7 @@ def job_elapsed_time_seconds(self, file_path: str, now_ts: float = None) -> floa

def _save_schema(self, schema: Schema, load_id: str) -> str:
dump = json.dumps(schema.to_dict())
schema_path = join(self.get_package_path(load_id), LoadStorage.SCHEMA_FILE_NAME)
schema_path = join(self.get_normalized_package_path(load_id), LoadStorage.SCHEMA_FILE_NAME)
return self.storage.save(schema_path, dump)

def _load_schema(self, schema_path: str) -> Schema:
Expand All @@ -388,14 +390,14 @@ def _load_schema(self, schema_path: str) -> Schema:
def _move_job(self, load_id: str, source_folder: TJobState, dest_folder: TJobState, file_name: str, new_file_name: str = None) -> str:
# ensure we move file names, not paths
assert file_name == FileStorage.get_file_name_from_file_path(file_name)
load_path = self.get_package_path(load_id)
load_path = self.get_normalized_package_path(load_id)
dest_path = join(load_path, dest_folder, new_file_name or file_name)
self.storage.atomic_rename(join(load_path, source_folder, file_name), dest_path)
# print(f"{join(load_path, source_folder, file_name)} -> {dest_path}")
return self.storage.make_full_path(dest_path)

def _get_job_folder_path(self, load_id: str, folder: TJobState) -> str:
return join(self.get_package_path(load_id), folder)
return join(self.get_normalized_package_path(load_id), folder)

def _get_job_file_path(self, load_id: str, folder: TJobState, file_name: str) -> str:
return join(self._get_job_folder_path(load_id, folder), file_name)
Expand Down Expand Up @@ -430,6 +432,15 @@ def build_job_file_name(self, table_name: str, file_id: str, retry_count: int =
return fn + f".{format_spec.file_extension}"
return fn

@staticmethod
def is_package_partially_loaded(package_info: LoadPackageInfo) -> bool:
"""Checks if package is partially loaded - has jobs that are not new."""
if package_info.state == "normalized":
pending_jobs: Sequence[TJobState] = ["new_jobs"]
else:
pending_jobs = ["completed_jobs", "failed_jobs"]
return sum(len(package_info.jobs[job_state]) for job_state in WORKING_FOLDERS if job_state not in pending_jobs) > 0

@staticmethod
def parse_job_file_name(file_name: str) -> ParsedLoadJobFileName:
p = Path(file_name)
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/dummy/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class DummyClientConfiguration(DestinationClientConfiguration):
fail_prob: float = 0.0
retry_prob: float = 0.0
completed_prob: float = 0.0
exception_prob: float = 0.0
"""probability of exception when checking job status"""
timeout: float = 10.0
fail_in_init: bool = True

Expand All @@ -35,6 +37,7 @@ def __init__(
fail_prob: float = None,
retry_prob: float = None,
completed_prob: float = None,
exception_prob: float = None,
timeout: float = None,
fail_in_init: bool = None,
) -> None:
Expand Down
15 changes: 9 additions & 6 deletions dlt/destinations/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@ def __init__(self, file_name: str, config: DummyClientConfiguration) -> None:
self._exception: str = None
self.start_time: float = pendulum.now().timestamp()
super().__init__(file_name)
# if config.fail_in_init:
s = self.state()
if s == "failed":
raise DestinationTerminalException(self._exception)
if s == "retry":
raise DestinationTransientException(self._exception)
if config.fail_in_init:
s = self.state()
if s == "failed":
raise DestinationTerminalException(self._exception)
if s == "retry":
raise DestinationTransientException(self._exception)


def state(self) -> TLoadJobState:
# this should poll the server for a job status, here we simulate various outcomes
if self._status == "running":
c_r = random.random()
if self.config.exception_prob >= c_r:
raise DestinationTransientException("Dummy job status raised exception")
n = pendulum.now().timestamp()
if n - self.start_time > self.config.timeout:
self._status = "failed"
Expand Down
4 changes: 2 additions & 2 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics:

logger.info("Running file loading")
# get list of loads and order by name ASC to execute schema updates
loads = self.load_storage.list_packages()
loads = self.load_storage.list_normalized_packages()
logger.info(f"Found {len(loads)} load packages")
if len(loads) == 0:
return TRunMetrics(True, 0)
Expand All @@ -404,7 +404,7 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics:
with self.collector(f"Load {schema.name} in {load_id}"):
self.load_single_package(load_id, schema)

return TRunMetrics(False, len(self.load_storage.list_packages()))
return TRunMetrics(False, len(self.load_storage.list_normalized_packages()))

def get_load_info(self, pipeline: SupportsPipeline, started_at: datetime.datetime = None) -> LoadInfo:
# TODO: LoadInfo should hold many datasets
Expand Down
18 changes: 16 additions & 2 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ def has_data(self) -> bool:
@property
def has_pending_data(self) -> bool:
"""Tells if the pipeline contains any extracted files or pending load packages"""
return bool(self.list_normalized_load_packages() or self.list_extracted_resources())
return len(self.list_normalized_load_packages()) > 0 or len(self.list_extracted_resources()) > 0

@property
def schemas(self) -> SchemaStorage:
Expand Down Expand Up @@ -623,7 +623,7 @@ def list_extracted_resources(self) -> Sequence[str]:

def list_normalized_load_packages(self) -> Sequence[str]:
"""Returns a list of all load packages ids that are or will be loaded."""
return self._get_load_storage().list_packages()
return self._get_load_storage().list_normalized_packages()

def list_completed_load_packages(self) -> Sequence[str]:
"""Returns a list of all load package ids that are completely loaded"""
Expand All @@ -637,6 +637,20 @@ def list_failed_jobs_in_package(self, load_id: str) -> Sequence[LoadJobInfo]:
"""List all failed jobs and associated error messages for a specified `load_id`"""
return self._get_load_storage().get_load_package_info(load_id).jobs.get("failed_jobs", [])

def drop_pending_packages(self, with_partial_loads: bool = True) -> None:
"""Deletes all extracted and normalized packages, including those that are partially loaded by default"""
# delete normalized packages
load_storage = self._get_load_storage()
for load_id in load_storage.list_normalized_packages():
package_info = load_storage.get_load_package_info(load_id)
if LoadStorage.is_package_partially_loaded(package_info) and not with_partial_loads:
continue
package_path = load_storage.get_normalized_package_path(load_id)
load_storage.storage.delete_folder(package_path, recursively=True)
# delete extracted files
normalize_storage = self._get_normalize_storage()
normalize_storage.delete_extracted_files(normalize_storage.list_files_to_normalize_sorted())

@with_schemas_sync
def sync_schema(self, schema_name: str = None, credentials: Any = None) -> TSchemaTables:
"""Synchronizes the schema `schema_name` with the destination. If no name is provided, the default schema will be synchronized."""
Expand Down
Loading

0 comments on commit 5207dc3

Please sign in to comment.