Skip to content

Commit

Permalink
Merge branch 'devel' into d#/data_contracts
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Nov 18, 2023
2 parents 340ed3d + 105795c commit 35c10b7
Show file tree
Hide file tree
Showing 154 changed files with 1,500 additions and 1,190 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ help:
@echo " runs flake and mypy"
@echo " test"
@echo " tests all the components including destinations"
@echo " test-local"
@echo " test-load-local"
@echo " tests all components unsing local destinations: duckdb and postgres"
@echo " test-common"
@echo " tests common components"
Expand Down
2 changes: 2 additions & 0 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dlt.extract.decorators import source, resource, transformer, defer
from dlt.pipeline import pipeline as _pipeline, run, attach, Pipeline, dbt, current as _current, mark as _mark
from dlt.pipeline import progress
from dlt import destinations

pipeline = _pipeline
current = _current
Expand Down Expand Up @@ -64,4 +65,5 @@
"TSecretValue",
"TCredentials",
"sources",
"destinations",
]
57 changes: 39 additions & 18 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@
pass


DEBUG_FLAG = False


def on_exception(ex: Exception, info: str) -> None:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(info))
if DEBUG_FLAG:
raise ex


@utils.track_command("init", False, "source_name", "destination_name")
def init_command_wrapper(source_name: str, destination_name: str, use_generic_template: bool, repo_location: str, branch: str) -> int:
try:
init_command(source_name, destination_name, use_generic_template, repo_location, branch)
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_INIT_DOCS_URL))
on_exception(ex, DLT_INIT_DOCS_URL)
return -1
return 0

Expand All @@ -41,8 +50,7 @@ def list_verified_sources_command_wrapper(repo_location: str, branch: str) -> in
try:
list_verified_sources_command(repo_location, branch)
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_INIT_DOCS_URL))
on_exception(ex, DLT_INIT_DOCS_URL)
return -1
return 0

Expand All @@ -66,9 +74,8 @@ def deploy_command_wrapper(pipeline_script_path: str, deployment_method: str, re
**kwargs
)
except (CannotRestorePipelineException, PipelineWasNotRun) as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("You must run the pipeline locally successfully at least once in order to deploy it.")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_DEPLOY_DOCS_URL))
on_exception(ex, DLT_DEPLOY_DOCS_URL)
return -2
except InvalidGitRepositoryError:
click.secho(
Expand All @@ -89,10 +96,8 @@ def deploy_command_wrapper(pipeline_script_path: str, deployment_method: str, re
)
return -4
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_DEPLOY_DOCS_URL))
on_exception(ex, DLT_DEPLOY_DOCS_URL)
return -5
# TODO: display stack trace if with debug flag
return 0


Expand All @@ -106,10 +111,10 @@ def pipeline_command_wrapper(
except CannotRestorePipelineException as ex:
click.secho(str(ex), err=True, fg="red")
click.secho("Try command %s to restore the pipeline state from destination" % fmt.bold(f"dlt pipeline {pipeline_name} sync"))
return 1
return -1
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
return 1
on_exception(ex, DLT_PIPELINE_COMMAND_DOCS_URL)
return -2


@utils.track_command("schema", False, "operation")
Expand All @@ -133,8 +138,7 @@ def telemetry_status_command_wrapper() -> int:
try:
telemetry_status_command()
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_TELEMETRY_DOCS_URL))
on_exception(ex, DLT_TELEMETRY_DOCS_URL)
return -1
return 0

Expand All @@ -144,8 +148,7 @@ def telemetry_change_status_command_wrapper(enabled: bool) -> int:
try:
change_telemetry_status_command(enabled)
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_TELEMETRY_DOCS_URL))
on_exception(ex, DLT_TELEMETRY_DOCS_URL)
return -1
return 0

Expand Down Expand Up @@ -186,12 +189,28 @@ def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespac
fmt.ALWAYS_CHOOSE_DEFAULT = True


class DebugAction(argparse.Action):
def __init__(self, option_strings: Sequence[str], dest: Any = argparse.SUPPRESS, default: Any = argparse.SUPPRESS, help: str = None) -> None: # noqa
super(DebugAction, self).__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help
)
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str = None) -> None:
global DEBUG_FLAG
# will show stack traces (and maybe more debug things)
DEBUG_FLAG = True


def main() -> int:
parser = argparse.ArgumentParser(description="Creates, adds, inspects and deploys dlt pipelines.", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--version', action="version", version='%(prog)s {version}'.format(version=__version__))
parser.add_argument('--disable-telemetry', action=TelemetryAction, help="Disables telemetry before command is executed")
parser.add_argument('--enable-telemetry', action=TelemetryAction, help="Enables telemetry before command is executed")
parser.add_argument('--non-interactive', action=NonInteractiveAction, help="Non interactive mode. Default choices are automatically made for confirmations and prompts.")
parser.add_argument('--debug', action=DebugAction, help="Displays full stack traces on exceptions.")
subparsers = parser.add_subparsers(dest="command")

init_cmd = subparsers.add_parser("init", help="Creates a pipeline project in the current folder by adding existing verified source or creating a new one from template.")
Expand Down Expand Up @@ -239,8 +258,6 @@ def main() -> int:
pipe_cmd.add_argument("pipeline_name", nargs='?', help="Pipeline name")
pipe_cmd.add_argument("--pipelines-dir", help="Pipelines working directory", default=None)
pipe_cmd.add_argument("--verbose", "-v", action='count', default=0, help="Provides more information for certain commands.", dest="verbosity")
# pipe_cmd.add_argument("--dataset-name", help="Dataset name used to sync destination when local pipeline state is missing.")
# pipe_cmd.add_argument("--destination", help="Destination name used to sync when local pipeline state is missing.")

pipeline_subparsers = pipe_cmd.add_subparsers(dest="operation", required=False)

Expand All @@ -251,6 +268,7 @@ def main() -> int:
pipeline_subparsers.add_parser("info", help="Displays state of the pipeline, use -v or -vv for more info")
pipeline_subparsers.add_parser("show", help="Generates and launches Streamlit app with the loading status and dataset explorer")
pipeline_subparsers.add_parser("failed-jobs", help="Displays information on all the failed loads in all completed packages, failed jobs and associated error messages")
pipeline_subparsers.add_parser("drop-pending-packages", help="Deletes all extracted and normalized packages including those that are partially loaded.")
pipeline_subparsers.add_parser(
"sync",
help="Drops the local state of the pipeline and resets all the schemas and restores it from destination. The destination state, data and schemas are left intact.",
Expand Down Expand Up @@ -290,6 +308,9 @@ def main() -> int:
return pipeline_command_wrapper("list", "-", args.pipelines_dir, args.verbosity)
else:
command_kwargs = dict(args._get_kwargs())
if not command_kwargs.get("pipeline_name"):
pipe_cmd.print_usage()
return -1
command_kwargs['operation'] = args.operation or "info"
del command_kwargs["command"]
del command_kwargs["list_pipelines"]
Expand Down
4 changes: 2 additions & 2 deletions dlt/cli/deploy_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from dlt.version import DLT_PKG_NAME

from dlt.common.destination.reference import DestinationReference
from dlt.common.destination.reference import Destination

REQUIREMENTS_GITHUB_ACTION = "requirements_github_action.txt"
DLT_DEPLOY_DOCS_URL = "https://dlthub.com/docs/walkthroughs/deploy-a-pipeline"
Expand Down Expand Up @@ -198,7 +198,7 @@ def __init__(
def _generate_workflow(self, *args: Optional[Any]) -> None:
self.deployment_method = DeploymentMethods.airflow_composer.value

req_dep = f"{DLT_PKG_NAME}[{DestinationReference.to_name(self.state['destination'])}]"
req_dep = f"{DLT_PKG_NAME}[{Destination.to_name(self.state['destination'])}]"
req_dep_line = f"{req_dep}>={pkg_version(DLT_PKG_NAME)}"

self.artifacts["requirements_txt"] = req_dep_line
Expand Down
6 changes: 3 additions & 3 deletions dlt/cli/init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dlt.common.pipeline import get_dlt_repos_dir
from dlt.common.source import _SOURCES
from dlt.version import DLT_PKG_NAME, __version__
from dlt.common.destination import DestinationReference
from dlt.common.destination import Destination
from dlt.common.reflection.utils import rewrite_python_script
from dlt.common.schema.utils import is_valid_schema_name
from dlt.common.schema.exceptions import InvalidSchemaName
Expand Down Expand Up @@ -160,8 +160,8 @@ def list_verified_sources_command(repo_location: str, branch: str = None) -> Non

def init_command(source_name: str, destination_name: str, use_generic_template: bool, repo_location: str, branch: str = None) -> None:
# try to import the destination and get config spec
destination_reference = DestinationReference.from_name(destination_name)
destination_spec = destination_reference.spec()
destination_reference = Destination.from_reference(destination_name)
destination_spec = destination_reference.spec

fmt.echo("Looking up the init scripts in %s..." % fmt.bold(repo_location))
clone_storage = git.get_fresh_repo_files(repo_location, get_dlt_repos_dir(), branch=branch)
Expand Down
42 changes: 29 additions & 13 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import yaml
from typing import Any
from typing import Any, Sequence, Tuple
import dlt
from dlt.cli.exceptions import CliCommandException

Expand All @@ -9,8 +9,7 @@
from dlt.common.runners import Venv
from dlt.common.runners.stdout import iter_stdout
from dlt.common.schema.utils import group_tables_by_resource, remove_defaults
from dlt.common.storages.file_storage import FileStorage
from dlt.common.typing import DictStrAny
from dlt.common.storages import FileStorage, LoadStorage
from dlt.pipeline.helpers import DropCommand
from dlt.pipeline.exceptions import CannotRestorePipelineException

Expand All @@ -33,6 +32,8 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
return

try:
if verbosity > 0:
fmt.echo("Attaching to pipeline %s" % fmt.bold(pipeline_name))
p = dlt.attach(pipeline_name=pipeline_name, pipelines_dir=pipelines_dir)
except CannotRestorePipelineException as e:
if operation not in {"sync", "drop"}:
Expand All @@ -52,6 +53,22 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
if operation == "sync":
return # No need to sync again

def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
extracted_files = p.list_extracted_resources()
if extracted_files:
fmt.echo("Has %s extracted files ready to be normalized" % fmt.bold(str(len(extracted_files))))
norm_packages = p.list_normalized_load_packages()
if norm_packages:
fmt.echo("Has %s load packages ready to be loaded with following load ids:" % fmt.bold(str(len(norm_packages))))
for load_id in norm_packages:
fmt.echo(load_id)
# load first (oldest) package
first_package_info = p.get_load_package_info(norm_packages[0])
if LoadStorage.is_package_partially_loaded(first_package_info):
fmt.warning("This package is partially loaded. Data in the destination may be modified.")
fmt.echo()
return extracted_files, norm_packages

fmt.echo("Found pipeline %s in %s" % (fmt.bold(p.pipeline_name), fmt.bold(p.pipelines_dir)))

if operation == "show":
Expand Down Expand Up @@ -102,15 +119,7 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
fmt.echo("%s with %s table(s) and %s resource state slot(s)" % (fmt.bold(resource_name), fmt.bold(str(len(tables))), fmt.bold(str(res_state_slots))))
fmt.echo()
fmt.echo("Working dir content:")
extracted_files = p.list_extracted_resources()
if extracted_files:
fmt.echo("Has %s extracted files ready to be normalized" % fmt.bold(str(len(extracted_files))))
norm_packages = p.list_normalized_load_packages()
if norm_packages:
fmt.echo("Has %s load packages ready to be loaded with following load ids:" % fmt.bold(str(len(norm_packages))))
for load_id in norm_packages:
fmt.echo(load_id)
fmt.echo()
_display_pending_packages()
loaded_packages = p.list_completed_load_packages()
if loaded_packages:
fmt.echo("Has %s completed load packages with following load ids:" % fmt.bold(str(len(loaded_packages))))
Expand Down Expand Up @@ -148,6 +157,13 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
else:
fmt.echo("No failed jobs found")

if operation == "drop-pending-packages":
extracted_files, norm_packages = _display_pending_packages()
if len(extracted_files) == 0 and len(norm_packages) == 0:
fmt.echo("No pending packages found")
if fmt.confirm("Delete the above packages?", default=False):
p.drop_pending_packages(with_partial_loads=True)
fmt.echo("Pending packages deleted")

if operation == "sync":
if fmt.confirm("About to drop the local state of the pipeline and reset all the schemas. The destination state, data and schemas are left intact. Proceed?", default=False):
Expand Down Expand Up @@ -196,7 +212,7 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
fmt.warning(warning)
return

fmt.echo("About to drop the following data in dataset %s in destination %s:" % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.__name__)))
fmt.echo("About to drop the following data in dataset %s in destination %s:" % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.name)))
fmt.echo("%s: %s" % (fmt.style("Selected schema", fg="green"), drop.info["schema_name"]))
fmt.echo("%s: %s" % (fmt.style("Selected resource(s)", fg="green"), drop.info["resource_names"]))
fmt.echo("%s: %s" % (fmt.style("Table(s) to drop", fg="green"), drop.info["tables"]))
Expand Down
16 changes: 11 additions & 5 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def with_config(
sections: Tuple[str, ...] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True
include_defaults: bool = True,
accept_partial: bool = False,
) -> TFun:
...

Expand All @@ -45,7 +46,8 @@ def with_config(
sections: Tuple[str, ...] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True
include_defaults: bool = True,
accept_partial: bool = False,
) -> Callable[[TFun], TFun]:
...

Expand All @@ -57,7 +59,9 @@ def with_config(
sections: Tuple[str, ...] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
) -> Callable[[TFun], TFun]:
"""Injects values into decorated function arguments following the specification in `spec` or by deriving one from function's signature.
Expand Down Expand Up @@ -127,7 +131,9 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
curr_sections = sections

# if one of arguments is spec the use it as initial value
if spec_arg:
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
Expand All @@ -139,7 +145,7 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
with _RESOLVE_LOCK:
with inject_section(section_context):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments)
config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments, accept_partial=accept_partial)
resolved_params = dict(config)
# overwrite or add resolved params
for p in sig.parameters.values():
Expand Down
5 changes: 3 additions & 2 deletions dlt/common/destination/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from dlt.common.destination.capabilities import DestinationCapabilitiesContext, TLoaderFileFormat, ALL_SUPPORTED_FILE_FORMATS
from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg
from dlt.common.destination.reference import TDestinationReferenceArg, Destination, TDestination

__all__ = [
"DestinationCapabilitiesContext",
"TLoaderFileFormat",
"ALL_SUPPORTED_FILE_FORMATS",
"DestinationReference",
"TDestinationReferenceArg",
"Destination",
"TDestination",
]
Loading

0 comments on commit 35c10b7

Please sign in to comment.