Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial command and run_command file #1045

Closed
wants to merge 59 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
d339b6d
Add initial command and run_command file
sultaniman Mar 4, 2024
0aecb8b
MVP draft for dlt run
sultaniman Mar 12, 2024
6c36955
Initial implementation of runner usinf reflection and code generation
sultaniman Mar 13, 2024
b7fa238
Add more runtime information print outs
sultaniman Mar 14, 2024
f5218d9
Refactor cli runner implementation
sultaniman Mar 15, 2024
9e51900
Rename RunnerInventory to RunnerParams
sultaniman Mar 15, 2024
756ed57
Show warning if current directory is different from the one of pipeli…
sultaniman Mar 15, 2024
01f941d
Cleanup runner code
sultaniman Mar 15, 2024
6b1855a
Adjust pipeline run argument builder
sultaniman Mar 15, 2024
bc564e3
Add echo.error_style helper
sultaniman Mar 15, 2024
4f55031
Move preflight checks into inquiry helper
sultaniman Mar 15, 2024
584f4f2
Inspect module.__dict__ to extract objects instead of inspect module
sultaniman Mar 15, 2024
6ff3106
Adjust warning message
sultaniman Mar 15, 2024
4cc4f30
Check if selected resource has been called
sultaniman Mar 15, 2024
73c7db4
Update docstrings
sultaniman Mar 15, 2024
9cf559f
Adjust docstrings and prompt messages
sultaniman Mar 15, 2024
e456f26
Rename a variable
sultaniman Mar 15, 2024
ff464e3
Fix mypy errors
sultaniman Mar 18, 2024
66311e8
Fix linting issues
sultaniman Mar 18, 2024
3c8c77a
Add first test for run command
sultaniman Mar 18, 2024
9eb8b2a
Remove config path from params
sultaniman Mar 20, 2024
b3631f4
Create separate pipeline case for cli runner
sultaniman Mar 20, 2024
339fd63
Patch sys.argv instead of copying
sultaniman Mar 21, 2024
a35279b
Name variables properly and add docstrings to run_pipeline_command
sultaniman Mar 21, 2024
7cbc087
Extract reusable pieces
sultaniman Mar 21, 2024
1434c27
Refactor preflight checks
sultaniman Mar 22, 2024
bb608ef
Add dlt specific resource, source and pipeline alias by variable names
sultaniman Mar 22, 2024
9a1845a
Show pipeline name and source or resource name in case they were pass…
sultaniman Mar 22, 2024
11529b0
Fix mypy issues
sultaniman Mar 22, 2024
38aef78
Adjust error messages
sultaniman Mar 22, 2024
666879d
Add fmt.info_style helper
sultaniman Mar 22, 2024
3ef0ed9
Allow only bound resources and sources and simplify code
sultaniman Mar 22, 2024
6887397
Format code
sultaniman Mar 22, 2024
381a265
Adjust user messages
sultaniman Mar 22, 2024
06f3cf0
Format code
sultaniman Mar 22, 2024
29b5ec9
Cleanup code
sultaniman Mar 22, 2024
ed1fadc
Strip all load info access for pipeline.run calls
sultaniman Mar 22, 2024
2efc6c2
Adjust test pipeline
sultaniman Mar 22, 2024
7b48ed4
Adjust messages printed to users
sultaniman Mar 22, 2024
7b5dabd
Update tests
sultaniman Mar 22, 2024
2c972f0
Fix mypy issues
sultaniman Mar 22, 2024
2d2807b
Add more tests
sultaniman Mar 22, 2024
56e7e25
Allow running of pipeline when .dlt is missing
sultaniman Mar 22, 2024
b7e3b26
Adjust pipeline
sultaniman Mar 25, 2024
5f3aacf
Add more tests
sultaniman Mar 25, 2024
2ab7125
Remove redundant file
sultaniman Mar 25, 2024
b911408
Monkey patch pipeline.run while loading the pipeline script
sultaniman Mar 25, 2024
6f6577c
Add test pipeline script with immediate pipeline.run
sultaniman Mar 25, 2024
a5a10c6
Show RunnerError using fmt.error_style
sultaniman Mar 25, 2024
5726391
Keep pytest output clean
sultaniman Mar 25, 2024
dd3395c
Fix mypy warning
sultaniman Mar 25, 2024
0385ba3
Add more tests
sultaniman Mar 25, 2024
3ddd9fa
Test happy path and try to attach to pipeline once runner exits
sultaniman Mar 25, 2024
ca5aa99
Cleanup tests
sultaniman Mar 25, 2024
ce0f5bb
Add help message
sultaniman Mar 25, 2024
71c7c4a
Add simple validator for pipeline arguments
sultaniman Mar 25, 2024
26b43f3
Use raw formatter
sultaniman Mar 25, 2024
9d94feb
Check if data persisted after the pipeline run
sultaniman Mar 26, 2024
a193408
Add pipeline.run argument tests
sultaniman Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 108 additions & 16 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
DEFAULT_VERIFIED_SOURCES_REPO,
)
from dlt.cli.pipeline_command import pipeline_command, DLT_PIPELINE_COMMAND_DOCS_URL
from dlt.cli.run_command import run_pipeline_command
from dlt.common.cli.runner.help import run_args_help
from dlt.cli.telemetry_command import (
DLT_TELEMETRY_DOCS_URL,
change_telemetry_status_command,
Expand Down Expand Up @@ -135,7 +137,11 @@ def deploy_command_wrapper(

@utils.track_command("pipeline", True, "operation")
def pipeline_command_wrapper(
operation: str, pipeline_name: str, pipelines_dir: str, verbosity: int, **command_kwargs: Any
operation: str,
pipeline_name: str,
pipelines_dir: str,
verbosity: int,
**command_kwargs: Any,
) -> int:
try:
pipeline_command(operation, pipeline_name, pipelines_dir, verbosity, **command_kwargs)
Expand Down Expand Up @@ -205,7 +211,11 @@ def __init__(
help: str = None, # noqa
) -> None:
super(TelemetryAction, self).__init__(
option_strings=option_strings, dest=dest, default=default, nargs=0, help=help
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help,
)

def __call__(
Expand All @@ -230,7 +240,11 @@ def __init__(
help: str = None, # noqa
) -> None:
super(NonInteractiveAction, self).__init__(
option_strings=option_strings, dest=dest, default=default, nargs=0, help=help
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help,
)

def __call__(
Expand All @@ -252,7 +266,11 @@ def __init__(
help: str = None, # noqa
) -> None:
super(DebugAction, self).__init__(
option_strings=option_strings, dest=dest, default=default, nargs=0, help=help
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help,
)

def __call__(
Expand All @@ -273,7 +291,9 @@ def main() -> int:
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--version", action="version", version="%(prog)s {version}".format(version=__version__)
"--version",
action="version",
version="%(prog)s {version}".format(version=__version__),
)
parser.add_argument(
"--disable-telemetry",
Expand Down Expand Up @@ -366,7 +386,9 @@ def main() -> int:
"deploy", help="Creates a deployment package for a selected pipeline script"
)
deploy_cmd.add_argument(
"pipeline_script_path", metavar="pipeline-script-path", help="Path to a pipeline script"
"pipeline_script_path",
metavar="pipeline-script-path",
help="Path to a pipeline script",
)
deploy_sub_parsers = deploy_cmd.add_subparsers(dest="deployment_method")

Expand Down Expand Up @@ -423,25 +445,37 @@ def main() -> int:
)
deploy_cmd.add_argument("--help", "-h", nargs="?", const=True)
deploy_cmd.add_argument(
"pipeline_script_path", metavar="pipeline-script-path", nargs=argparse.REMAINDER
"pipeline_script_path",
metavar="pipeline-script-path",
nargs=argparse.REMAINDER,
)

schema = subparsers.add_parser("schema", help="Shows, converts and upgrades schemas")
schema.add_argument(
"file", help="Schema file name, in yaml or json format, will autodetect based on extension"
"file",
help="Schema file name, in yaml or json format, will autodetect based on extension",
)
schema.add_argument(
"--format", choices=["json", "yaml"], default="yaml", help="Display schema in this format"
"--format",
choices=["json", "yaml"],
default="yaml",
help="Display schema in this format",
)
schema.add_argument(
"--remove-defaults", action="store_true", help="Does not show default hint values"
"--remove-defaults",
action="store_true",
help="Does not show default hint values",
)

pipe_cmd = subparsers.add_parser(
"pipeline", help="Operations on pipelines that were ran locally"
)
pipe_cmd.add_argument(
"--list-pipelines", "-l", default=False, action="store_true", help="List local pipelines"
"--list-pipelines",
"-l",
default=False,
action="store_true",
help="List local pipelines",
)
pipe_cmd.add_argument(
"--hot-reload",
Expand All @@ -464,10 +498,12 @@ def main() -> int:

pipe_cmd_sync_parent = argparse.ArgumentParser(add_help=False)
pipe_cmd_sync_parent.add_argument(
"--destination", help="Sync from this destination when local pipeline state is missing."
"--destination",
help="Sync from this destination when local pipeline state is missing.",
)
pipe_cmd_sync_parent.add_argument(
"--dataset-name", help="Dataset name to sync from when local pipeline state is missing."
"--dataset-name",
help="Dataset name to sync from when local pipeline state is missing.",
)

pipeline_subparsers.add_parser(
Expand Down Expand Up @@ -510,7 +546,9 @@ def main() -> int:
help="Display schema in this format",
)
pipe_cmd_schema.add_argument(
"--remove-defaults", action="store_true", help="Does not show default hint values"
"--remove-defaults",
action="store_true",
help="Does not show default hint values",
)

pipe_cmd_drop = pipeline_subparsers.add_parser(
Expand Down Expand Up @@ -552,7 +590,8 @@ def main() -> int:
)

pipe_cmd_package = pipeline_subparsers.add_parser(
"load-package", help="Displays information on load package, use -v or -vv for more info"
"load-package",
help="Displays information on load package, use -v or -vv for more info",
)
pipe_cmd_package.add_argument(
"load_id",
Expand All @@ -563,6 +602,48 @@ def main() -> int:

subparsers.add_parser("telemetry", help="Shows telemetry status")

# CLI pipeline runner
run_cmd = subparsers.add_parser(
"run",
help="Run pipelines in a given directory",
formatter_class=argparse.RawTextHelpFormatter,
)

run_cmd.add_argument(
"module",
help="Path to module or python file with pipelines",
)

run_cmd.add_argument(
"pipeline_name",
type=str,
nargs="?",
help="Pipeline name",
)

run_cmd.add_argument(
"source",
type=str,
nargs="?",
help="Source or resource name",
)

# TODO: enable once pipeline.run with full refresh option is available
# run_cmd.add_argument(
# "--full-refresh",
# action="store_true",
# default=False,
# help="When used pipeline will run in full-refresh mode",
# )

run_cmd.add_argument(
"--args",
"-a",
nargs="+",
default=[],
help=run_args_help,
)

args = parser.parse_args()

if Venv.is_virtual_env() and not Venv.is_venv_activated():
Expand All @@ -575,6 +656,13 @@ def main() -> int:

if args.command == "schema":
return schema_command_wrapper(args.file, args.format, args.remove_defaults)
elif args.command == "run":
return run_pipeline_command(
args.module,
args.pipeline_name,
args.source,
args.args,
)
elif args.command == "pipeline":
if args.list_pipelines:
return pipeline_command_wrapper("list", "-", args.pipelines_dir, args.verbosity)
Expand All @@ -596,7 +684,11 @@ def main() -> int:
return -1
else:
return init_command_wrapper(
args.source, args.destination, args.generic, args.location, args.branch
args.source,
args.destination,
args.generic,
args.location,
args.branch,
)
elif args.command == "deploy":
try:
Expand Down
12 changes: 12 additions & 0 deletions dlt/cli/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,26 @@ def bold(msg: str) -> str:
return click.style(msg, bold=True, reset=True)


def info_style(msg: str) -> str:
return click.style(msg, fg="blue", reset=True)


def warning_style(msg: str) -> str:
return click.style(msg, fg="yellow", reset=True)


def error_style(msg: str) -> str:
return click.style(msg, fg="red", reset=True)


def error(msg: str) -> None:
click.secho("ERROR: " + msg, fg="red")


def info(msg: str) -> None:
click.secho("INFO: " + msg, fg="blue")


def warning(msg: str) -> None:
click.secho("WARNING: " + msg, fg="yellow")

Expand Down
51 changes: 51 additions & 0 deletions dlt/cli/run_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
from typing import List, Optional

from dlt.cli import echo as fmt
from dlt.cli.utils import track_command
from dlt.common.cli.runner.errors import FriendlyExit, PreflightError, RunnerError
from dlt.common.cli.runner.runner import PipelineRunner
from dlt.common.cli.runner.types import RunnerParams
from dlt.pipeline.exceptions import PipelineStepFailed


@track_command("run", False)
def run_pipeline_command(
module: str,
pipeline_name: Optional[str] = None,
source_name: Optional[str] = None,
args: Optional[List[str]] = None,
) -> int:
"""Run the given module if any pipeline and sources or resources exist in it

Args:
module (str): path to python module or file with dlt artifacts
pipeline_name (Optional[str]): Pipeline name
source_name (Optional[str]): Source or resource name
args (Optiona[List[str]]): List of arguments to `pipeline.run` method

Returns:
(int): exit code
"""
params = RunnerParams(
module,
current_dir=os.getcwd(),
pipeline_name=pipeline_name,
source_name=source_name,
args=args,
)

try:
with PipelineRunner(params=params) as runner:
load_info = runner.run()
fmt.echo("")
fmt.echo(load_info)
except PipelineStepFailed:
raise
except RunnerError as ex:
fmt.echo(fmt.error_style(ex.message))
return -1
except (FriendlyExit, PreflightError):
fmt.info("Stopping...")

return 0
Empty file added dlt/common/cli/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions dlt/common/cli/runner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dlt.common.cli.runner.pipeline_script import PipelineScript
from dlt.common.cli.runner.runner import PipelineRunner
from dlt.common.cli.runner.types import PipelineMembers, RunnerParams

__all__ = (
"PipelineRunner",
"RunnerParams",
"PipelineMembers",
"PipelineScript",
)
12 changes: 12 additions & 0 deletions dlt/common/cli/runner/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class RunnerError(Exception):
def __init__(self, message: str) -> None:
self.message = message
super().__init__(message)


class FriendlyExit(Exception):
pass


class PreflightError(Exception):
pass
24 changes: 24 additions & 0 deletions dlt/common/cli/runner/help.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing_extensions import get_args
from dlt.cli import echo as fmt
from dlt.common.destination.capabilities import TLoaderFileFormat
from dlt.common.schema.typing import TSchemaEvolutionMode, TWriteDisposition


supported_formats = "|".join(get_args(TLoaderFileFormat))
supported_evolution_modes = "|".join(get_args(TSchemaEvolutionMode))
supported_write_disposition = "|".join(get_args(TWriteDisposition))

run_args_help = "".join(
(
"Supported arguments passed to pipeline.run are",
fmt.info_style("\n - destination=string"),
fmt.info_style("\n - staging=string,"),
fmt.info_style("\n - credentials=string,"),
fmt.info_style("\n - table_name=string,"),
fmt.info_style(f"\n - write_disposition={supported_write_disposition},"),
fmt.info_style("\n - dataset_name=string,"),
fmt.info_style("\n - primary_key=string,"),
fmt.info_style(f"\n - schema_contract={supported_evolution_modes},"),
fmt.info_style(f"\n - loader_file_format={supported_formats}"),
)
)
Loading
Loading