Skip to content

Commit

Permalink
Merge branch 'devel' into sthor/parametrized-destination
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Nov 18, 2023
2 parents 1f16c8f + 582448c commit 2541d49
Show file tree
Hide file tree
Showing 36 changed files with 492 additions and 593 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ help:
@echo " runs flake and mypy"
@echo " test"
@echo " tests all the components including destinations"
@echo " test-local"
@echo " test-load-local"
@echo " tests all components unsing local destinations: duckdb and postgres"
@echo " test-common"
@echo " tests common components"
Expand Down
57 changes: 39 additions & 18 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@
pass


DEBUG_FLAG = False


def on_exception(ex: Exception, info: str) -> None:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(info))
if DEBUG_FLAG:
raise ex


@utils.track_command("init", False, "source_name", "destination_name")
def init_command_wrapper(source_name: str, destination_name: str, use_generic_template: bool, repo_location: str, branch: str) -> int:
try:
init_command(source_name, destination_name, use_generic_template, repo_location, branch)
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_INIT_DOCS_URL))
on_exception(ex, DLT_INIT_DOCS_URL)
return -1
return 0

Expand All @@ -41,8 +50,7 @@ def list_verified_sources_command_wrapper(repo_location: str, branch: str) -> in
try:
list_verified_sources_command(repo_location, branch)
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_INIT_DOCS_URL))
on_exception(ex, DLT_INIT_DOCS_URL)
return -1
return 0

Expand All @@ -66,9 +74,8 @@ def deploy_command_wrapper(pipeline_script_path: str, deployment_method: str, re
**kwargs
)
except (CannotRestorePipelineException, PipelineWasNotRun) as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("You must run the pipeline locally successfully at least once in order to deploy it.")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_DEPLOY_DOCS_URL))
on_exception(ex, DLT_DEPLOY_DOCS_URL)
return -2
except InvalidGitRepositoryError:
click.secho(
Expand All @@ -89,10 +96,8 @@ def deploy_command_wrapper(pipeline_script_path: str, deployment_method: str, re
)
return -4
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_DEPLOY_DOCS_URL))
on_exception(ex, DLT_DEPLOY_DOCS_URL)
return -5
# TODO: display stack trace if with debug flag
return 0


Expand All @@ -106,10 +111,10 @@ def pipeline_command_wrapper(
except CannotRestorePipelineException as ex:
click.secho(str(ex), err=True, fg="red")
click.secho("Try command %s to restore the pipeline state from destination" % fmt.bold(f"dlt pipeline {pipeline_name} sync"))
return 1
return -1
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
return 1
on_exception(ex, DLT_PIPELINE_COMMAND_DOCS_URL)
return -2


@utils.track_command("schema", False, "operation")
Expand All @@ -133,8 +138,7 @@ def telemetry_status_command_wrapper() -> int:
try:
telemetry_status_command()
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_TELEMETRY_DOCS_URL))
on_exception(ex, DLT_TELEMETRY_DOCS_URL)
return -1
return 0

Expand All @@ -144,8 +148,7 @@ def telemetry_change_status_command_wrapper(enabled: bool) -> int:
try:
change_telemetry_status_command(enabled)
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_TELEMETRY_DOCS_URL))
on_exception(ex, DLT_TELEMETRY_DOCS_URL)
return -1
return 0

Expand Down Expand Up @@ -186,12 +189,28 @@ def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespac
fmt.ALWAYS_CHOOSE_DEFAULT = True


class DebugAction(argparse.Action):
def __init__(self, option_strings: Sequence[str], dest: Any = argparse.SUPPRESS, default: Any = argparse.SUPPRESS, help: str = None) -> None: # noqa
super(DebugAction, self).__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help
)
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str = None) -> None:
global DEBUG_FLAG
# will show stack traces (and maybe more debug things)
DEBUG_FLAG = True


def main() -> int:
parser = argparse.ArgumentParser(description="Creates, adds, inspects and deploys dlt pipelines.", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--version', action="version", version='%(prog)s {version}'.format(version=__version__))
parser.add_argument('--disable-telemetry', action=TelemetryAction, help="Disables telemetry before command is executed")
parser.add_argument('--enable-telemetry', action=TelemetryAction, help="Enables telemetry before command is executed")
parser.add_argument('--non-interactive', action=NonInteractiveAction, help="Non interactive mode. Default choices are automatically made for confirmations and prompts.")
parser.add_argument('--debug', action=DebugAction, help="Displays full stack traces on exceptions.")
subparsers = parser.add_subparsers(dest="command")

init_cmd = subparsers.add_parser("init", help="Creates a pipeline project in the current folder by adding existing verified source or creating a new one from template.")
Expand Down Expand Up @@ -239,8 +258,6 @@ def main() -> int:
pipe_cmd.add_argument("pipeline_name", nargs='?', help="Pipeline name")
pipe_cmd.add_argument("--pipelines-dir", help="Pipelines working directory", default=None)
pipe_cmd.add_argument("--verbose", "-v", action='count', default=0, help="Provides more information for certain commands.", dest="verbosity")
# pipe_cmd.add_argument("--dataset-name", help="Dataset name used to sync destination when local pipeline state is missing.")
# pipe_cmd.add_argument("--destination", help="Destination name used to sync when local pipeline state is missing.")

pipeline_subparsers = pipe_cmd.add_subparsers(dest="operation", required=False)

Expand All @@ -251,6 +268,7 @@ def main() -> int:
pipeline_subparsers.add_parser("info", help="Displays state of the pipeline, use -v or -vv for more info")
pipeline_subparsers.add_parser("show", help="Generates and launches Streamlit app with the loading status and dataset explorer")
pipeline_subparsers.add_parser("failed-jobs", help="Displays information on all the failed loads in all completed packages, failed jobs and associated error messages")
pipeline_subparsers.add_parser("drop-pending-packages", help="Deletes all extracted and normalized packages including those that are partially loaded.")
pipeline_subparsers.add_parser(
"sync",
help="Drops the local state of the pipeline and resets all the schemas and restores it from destination. The destination state, data and schemas are left intact.",
Expand Down Expand Up @@ -290,6 +308,9 @@ def main() -> int:
return pipeline_command_wrapper("list", "-", args.pipelines_dir, args.verbosity)
else:
command_kwargs = dict(args._get_kwargs())
if not command_kwargs.get("pipeline_name"):
pipe_cmd.print_usage()
return -1
command_kwargs['operation'] = args.operation or "info"
del command_kwargs["command"]
del command_kwargs["list_pipelines"]
Expand Down
40 changes: 28 additions & 12 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import yaml
from typing import Any
from typing import Any, Sequence, Tuple
import dlt
from dlt.cli.exceptions import CliCommandException

Expand All @@ -9,8 +9,7 @@
from dlt.common.runners import Venv
from dlt.common.runners.stdout import iter_stdout
from dlt.common.schema.utils import group_tables_by_resource, remove_defaults
from dlt.common.storages.file_storage import FileStorage
from dlt.common.typing import DictStrAny
from dlt.common.storages import FileStorage, LoadStorage
from dlt.pipeline.helpers import DropCommand
from dlt.pipeline.exceptions import CannotRestorePipelineException

Expand All @@ -33,6 +32,8 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
return

try:
if verbosity > 0:
fmt.echo("Attaching to pipeline %s" % fmt.bold(pipeline_name))
p = dlt.attach(pipeline_name=pipeline_name, pipelines_dir=pipelines_dir)
except CannotRestorePipelineException as e:
if operation not in {"sync", "drop"}:
Expand All @@ -52,6 +53,22 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
if operation == "sync":
return # No need to sync again

def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
extracted_files = p.list_extracted_resources()
if extracted_files:
fmt.echo("Has %s extracted files ready to be normalized" % fmt.bold(str(len(extracted_files))))
norm_packages = p.list_normalized_load_packages()
if norm_packages:
fmt.echo("Has %s load packages ready to be loaded with following load ids:" % fmt.bold(str(len(norm_packages))))
for load_id in norm_packages:
fmt.echo(load_id)
# load first (oldest) package
first_package_info = p.get_load_package_info(norm_packages[0])
if LoadStorage.is_package_partially_loaded(first_package_info):
fmt.warning("This package is partially loaded. Data in the destination may be modified.")
fmt.echo()
return extracted_files, norm_packages

fmt.echo("Found pipeline %s in %s" % (fmt.bold(p.pipeline_name), fmt.bold(p.pipelines_dir)))

if operation == "show":
Expand Down Expand Up @@ -102,15 +119,7 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
fmt.echo("%s with %s table(s) and %s resource state slot(s)" % (fmt.bold(resource_name), fmt.bold(str(len(tables))), fmt.bold(str(res_state_slots))))
fmt.echo()
fmt.echo("Working dir content:")
extracted_files = p.list_extracted_resources()
if extracted_files:
fmt.echo("Has %s extracted files ready to be normalized" % fmt.bold(str(len(extracted_files))))
norm_packages = p.list_normalized_load_packages()
if norm_packages:
fmt.echo("Has %s load packages ready to be loaded with following load ids:" % fmt.bold(str(len(norm_packages))))
for load_id in norm_packages:
fmt.echo(load_id)
fmt.echo()
_display_pending_packages()
loaded_packages = p.list_completed_load_packages()
if loaded_packages:
fmt.echo("Has %s completed load packages with following load ids:" % fmt.bold(str(len(loaded_packages))))
Expand Down Expand Up @@ -148,6 +157,13 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
else:
fmt.echo("No failed jobs found")

if operation == "drop-pending-packages":
extracted_files, norm_packages = _display_pending_packages()
if len(extracted_files) == 0 and len(norm_packages) == 0:
fmt.echo("No pending packages found")
if fmt.confirm("Delete the above packages?", default=False):
p.drop_pending_packages(with_partial_loads=True)
fmt.echo("Pending packages deleted")

if operation == "sync":
if fmt.confirm("About to drop the local state of the pipeline and reset all the schemas. The destination state, data and schemas are left intact. Proceed?", default=False):
Expand Down
19 changes: 19 additions & 0 deletions dlt/common/schema/detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ def is_iso_timestamp(t: Type[Any], v: Any) -> Optional[TDataType]:
return None


def is_iso_date(t: Type[Any], v: Any) -> Optional[TDataType]:
# only strings can be converted
if not issubclass(t, str):
return None
if not v:
return None
# don't cast iso timestamps as dates
if is_iso_timestamp(t,v):
return None
# strict autodetection of iso timestamps
try:
dtv = parse_iso_like_datetime(v)
if isinstance(dtv, datetime.date):
return "date"
except Exception:
pass
return None


def is_large_integer(t: Type[Any], v: Any) -> Optional[TDataType]:
# only ints can be converted
if issubclass(t, int):
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TTableFormat = Literal["iceberg"]
TTypeDetections = Literal["timestamp", "iso_timestamp", "large_integer", "hexbytes_to_text", "wei_to_double"]
TTypeDetections = Literal["timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"]
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/storages/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def has_folder(self, relative_path: str) -> bool:
return os.path.isdir(self.make_full_path(relative_path))

def list_folder_files(self, relative_path: str, to_root: bool = True) -> List[str]:
"""List all files in ``relative_path`` folder
"""List all files in `relative_path` folder
Args:
relative_path (str): A path to folder, relative to storage root
Expand Down
Loading

0 comments on commit 2541d49

Please sign in to comment.