Skip to content

Commit

Permalink
add concept of single file templates in the core
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Sep 5, 2024
1 parent 2694624 commit d6b70bc
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 123 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ jobs:


# TODO: this is needed for the filesystem tests, not sure if this should be in an extra?
- name: Install pipeline and sources dependencies
- name: Install openpyxl for excel tests
run: pip install openpyxl

- run: |
Expand Down
13 changes: 0 additions & 13 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def on_exception(ex: Exception, info: str) -> None:
def init_command_wrapper(
source_name: str,
destination_type: str,
use_generic_template: bool,
repo_location: str,
branch: str,
omit_core_sources: bool = False,
Expand All @@ -63,7 +62,6 @@ def init_command_wrapper(
init_command(
source_name,
destination_type,
use_generic_template,
repo_location,
branch,
omit_core_sources,
Expand Down Expand Up @@ -342,16 +340,6 @@ def main() -> int:
default=None,
help="Advanced. Uses specific branch of the init repository to fetch the template.",
)
init_cmd.add_argument(
"--generic",
default=False,
action="store_true",
help=(
"When present uses a generic template with all the dlt loading code present will be"
" used. Otherwise a debug template is used that can be immediately run to get familiar"
" with the dlt sources."
),
)

init_cmd.add_argument(
"--omit-core-sources",
Expand Down Expand Up @@ -616,7 +604,6 @@ def main() -> int:
return init_command_wrapper(
args.source,
args.destination,
args.generic,
args.location,
args.branch,
args.omit_core_sources,
Expand Down
118 changes: 62 additions & 56 deletions dlt/cli/init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from dlt.common.schema.utils import is_valid_schema_name
from dlt.common.schema.exceptions import InvalidSchemaName
from dlt.common.storages.file_storage import FileStorage
from dlt.sources import init as init_module
from dlt.sources import pipeline_templates as init_module

import dlt.reflection.names as n
from dlt.reflection.script_inspector import inspect_pipeline_script, load_script_module
Expand All @@ -44,19 +44,34 @@

DLT_INIT_DOCS_URL = "https://dlthub.com/docs/reference/command-line-interface#dlt-init"
DEFAULT_VERIFIED_SOURCES_REPO = "https://github.com/dlt-hub/verified-sources.git"
INIT_MODULE_NAME = "init"
TEMPLATES_MODULE_NAME = "pipeline_templates"
SOURCES_MODULE_NAME = "sources"


def _get_template_files(
command_module: ModuleType, use_generic_template: bool
) -> Tuple[str, List[str]]:
template_files: List[str] = command_module.TEMPLATE_FILES
pipeline_script: str = command_module.PIPELINE_SCRIPT
if use_generic_template:
pipeline_script, py = os.path.splitext(pipeline_script)
pipeline_script = f"{pipeline_script}_generic{py}"
return pipeline_script, template_files
def _get_core_sources_storage() -> FileStorage:
"""Get FileStorage for core sources"""
local_path = Path(os.path.dirname(os.path.realpath(__file__))).parent / SOURCES_MODULE_NAME
return FileStorage(str(local_path))


def _get_templates_storage() -> FileStorage:
"""Get FileStorage for single file templates"""
# look up init storage in core
init_path = (
Path(os.path.dirname(os.path.realpath(__file__))).parent
/ SOURCES_MODULE_NAME
/ TEMPLATES_MODULE_NAME
)
return FileStorage(str(init_path))


def _clone_and_get_verified_sources_storage(repo_location: str, branch: str = None) -> FileStorage:
"""Clone and get FileStorage for verified sources templates"""

fmt.echo("Looking up verified sources at %s..." % fmt.bold(repo_location))
clone_storage = git.get_fresh_repo_files(repo_location, get_dlt_repos_dir(), branch=branch)
# copy dlt source files from here
return FileStorage(clone_storage.make_full_path(SOURCES_MODULE_NAME))


def _select_source_files(
Expand Down Expand Up @@ -131,9 +146,16 @@ def _get_dependency_system(dest_storage: FileStorage) -> str:
return None


def _list_template_sources() -> Dict[str, SourceConfiguration]:
template_storage = _get_templates_storage()
sources: Dict[str, SourceConfiguration] = {}
for source_name in files_ops.get_sources_names(template_storage, source_type="template"):
sources[source_name] = files_ops.get_template_configuration(template_storage, source_name)
return sources


def _list_core_sources() -> Dict[str, SourceConfiguration]:
local_path = Path(os.path.dirname(os.path.realpath(__file__))).parent / SOURCES_MODULE_NAME
core_sources_storage = FileStorage(str(local_path))
core_sources_storage = _get_core_sources_storage()

sources: Dict[str, SourceConfiguration] = {}
for source_name in files_ops.get_sources_names(core_sources_storage, source_type="core"):
Expand All @@ -146,14 +168,15 @@ def _list_core_sources() -> Dict[str, SourceConfiguration]:
def _list_verified_sources(
repo_location: str, branch: str = None
) -> Dict[str, SourceConfiguration]:
clone_storage = git.get_fresh_repo_files(repo_location, get_dlt_repos_dir(), branch=branch)
sources_storage = FileStorage(clone_storage.make_full_path(SOURCES_MODULE_NAME))
verified_sources_storage = _clone_and_get_verified_sources_storage(repo_location, branch)

sources: Dict[str, SourceConfiguration] = {}
for source_name in files_ops.get_sources_names(sources_storage, source_type="verified"):
for source_name in files_ops.get_sources_names(
verified_sources_storage, source_type="verified"
):
try:
sources[source_name] = files_ops.get_verified_source_configuration(
sources_storage, source_name
verified_sources_storage, source_name
)
except Exception as ex:
fmt.warning(f"Verified source {source_name} not available: {ex}")
Expand All @@ -169,7 +192,7 @@ def _welcome_message(
is_new_source: bool,
) -> None:
fmt.echo()
if source_configuration.source_type in ["generic", "core"]:
if source_configuration.source_type in ["template", "core"]:
fmt.echo("Your new pipeline %s is ready to be customized!" % fmt.bold(source_name))
fmt.echo(
"* Review and change how dlt loads your data in %s"
Expand Down Expand Up @@ -247,15 +270,22 @@ def list_sources_command(repo_location: str, branch: str = None) -> None:
fmt.echo(msg)

fmt.echo("---")
fmt.echo("Looking up verified sources at %s..." % fmt.bold(repo_location))
fmt.echo("Available dlt single file templates:")
fmt.echo("---")
template_sources = _list_template_sources()
for source_name, source_configuration in template_sources.items():
msg = "%s: %s" % (fmt.bold(source_name), source_configuration.doc)
fmt.echo(msg)

fmt.echo("---")
fmt.echo("Available verified sources:")
fmt.echo("---")
for source_name, source_configuration in _list_verified_sources(repo_location, branch).items():
reqs = source_configuration.requirements
dlt_req_string = str(reqs.dlt_requirement_base)
msg = "%s:" % (fmt.bold(source_name))
msg = "%s: " % (fmt.bold(source_name))
if source_name in core_sources.keys():
msg += " (Deprecated since dlt 1.0.0 in favor of core source of the same name) "
msg += "(Deprecated since dlt 1.0.0 in favor of core source of the same name) "
msg += source_configuration.doc
if not reqs.is_installed_dlt_compatible():
msg += fmt.warning_style(" [needs update: %s]" % (dlt_req_string))
Expand All @@ -266,7 +296,6 @@ def list_sources_command(repo_location: str, branch: str = None) -> None:
def init_command(
source_name: str,
destination_type: str,
use_generic_template: bool,
repo_location: str,
branch: str = None,
omit_core_sources: bool = False,
Expand All @@ -275,38 +304,25 @@ def init_command(
destination_reference = Destination.from_reference(destination_type)
destination_spec = destination_reference.spec

# lookup core sources
local_path = Path(os.path.dirname(os.path.realpath(__file__))).parent / SOURCES_MODULE_NAME
core_sources_storage = FileStorage(str(local_path))
# lookup core storages
core_sources_storage = _get_core_sources_storage()
templates_storage = _get_templates_storage()

# discover type of source
source_type: files_ops.TSourceType = "generic"
source_type: files_ops.TSourceType = "template"
if (
source_name in files_ops.get_sources_names(core_sources_storage, source_type="core")
) and not omit_core_sources:
source_type = "core"
else:
if omit_core_sources:
fmt.echo("Omitting dlt core sources.")
fmt.echo("Looking up verified sources at %s..." % fmt.bold(repo_location))
clone_storage = git.get_fresh_repo_files(repo_location, get_dlt_repos_dir(), branch=branch)
# copy dlt source files from here
verified_sources_storage = FileStorage(clone_storage.make_full_path(SOURCES_MODULE_NAME))
verified_sources_storage = _clone_and_get_verified_sources_storage(repo_location, branch)
if source_name in files_ops.get_sources_names(
verified_sources_storage, source_type="verified"
):
source_type = "verified"

# look up init storage in core
init_path = (
Path(os.path.dirname(os.path.realpath(__file__))).parent
/ SOURCES_MODULE_NAME
/ INIT_MODULE_NAME
)

pipeline_script, template_files = _get_template_files(init_module, use_generic_template)
init_storage = FileStorage(str(init_path))

# prepare destination storage
dest_storage = FileStorage(os.path.abspath("."))
if not dest_storage.has_folder(get_dlt_settings_dir()):
Expand Down Expand Up @@ -360,7 +376,7 @@ def init_command(
" update correctly in the future."
)
# add template files
source_configuration.files.extend(template_files)
source_configuration.files.extend(files_ops.TEMPLATE_FILES)

else:
if source_type == "core":
Expand All @@ -370,15 +386,8 @@ def init_command(
else:
if not is_valid_schema_name(source_name):
raise InvalidSchemaName(source_name)
source_configuration = SourceConfiguration(
source_type,
"pipeline",
init_storage,
pipeline_script,
source_name + "_pipeline.py",
template_files,
SourceRequirements([]),
"",
source_configuration = files_ops.get_template_configuration(
templates_storage, source_name
)

if dest_storage.has_file(source_configuration.dest_pipeline_script):
Expand Down Expand Up @@ -453,7 +462,7 @@ def init_command(
)

# detect all the required secrets and configs that should go into tomls files
if source_configuration.source_type == "generic":
if source_configuration.source_type == "template":
# replace destination, pipeline_name and dataset_name in templates
transformed_nodes = source_detection.find_call_arguments_to_replace(
visitor,
Expand Down Expand Up @@ -542,9 +551,6 @@ def init_command(
" available sources."
)

if use_generic_template and source_configuration.source_type != "generic":
fmt.warning("The --generic parameter is discarded if a source is found.")

if not fmt.confirm("Do you want to proceed?", default=True):
raise CliCommandException("init", "Aborted")

Expand All @@ -557,11 +563,11 @@ def init_command(
for file_name in source_configuration.files:
dest_path = dest_storage.make_full_path(file_name)
# get files from init section first
if init_storage.has_file(file_name):
if templates_storage.has_file(file_name):
if dest_storage.has_file(dest_path):
# do not overwrite any init files
continue
src_path = init_storage.make_full_path(file_name)
src_path = templates_storage.make_full_path(file_name)
else:
# only those that were modified should be copied from verified sources
if file_name in remote_modified:
Expand Down
41 changes: 39 additions & 2 deletions dlt/cli/pipeline_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dlt.cli import utils
from dlt.cli.requirements import SourceRequirements

TSourceType = Literal["core", "verified", "generic"]
TSourceType = Literal["core", "verified", "template"]

SOURCES_INIT_INFO_ENGINE_VERSION = 1
SOURCES_INIT_INFO_FILE = ".sources"
Expand All @@ -26,8 +26,13 @@
".*",
"_*",
"helpers",
"init",
"pipeline_templates",
]
PIPELINE_FILE_SUFFIX = "_pipeline.py"

# hardcode default template files here
TEMPLATE_FILES = [".gitignore", ".dlt/config.toml", ".dlt/secrets.toml"]
DEFAULT_PIPELINE_TEMPLATE = "default_pipeline.py"


class SourceConfiguration(NamedTuple):
Expand Down Expand Up @@ -157,6 +162,14 @@ def get_remote_source_index(

def get_sources_names(sources_storage: FileStorage, source_type: TSourceType) -> List[str]:
candidates: List[str] = []

# for the templates we just find all the filenames
if source_type == "template":
for name in sources_storage.list_folder_files(".", to_root=False):
if name.endswith(PIPELINE_FILE_SUFFIX):
candidates.append(name.replace(PIPELINE_FILE_SUFFIX, ""))
return candidates

ignore_cases = IGNORE_VERIFIED_SOURCES if source_type == "verified" else IGNORE_CORE_SOURCES
for name in [
n
Expand All @@ -180,6 +193,30 @@ def _get_docstring_for_module(sources_storage: FileStorage, source_name: str) ->
return docstring


def get_template_configuration(
sources_storage: FileStorage, source_name: str
) -> SourceConfiguration:
destination_pipeline_file_name = source_name + PIPELINE_FILE_SUFFIX
source_pipeline_file_name = destination_pipeline_file_name

if not sources_storage.has_file(source_pipeline_file_name):
source_pipeline_file_name = DEFAULT_PIPELINE_TEMPLATE

docstring = get_module_docstring(sources_storage.load(source_pipeline_file_name))
if docstring:
docstring = docstring.splitlines()[0]
return SourceConfiguration(
"template",
"pipeline",
sources_storage,
source_pipeline_file_name,
destination_pipeline_file_name,
TEMPLATE_FILES,
SourceRequirements([]),
docstring,
)


def get_core_source_configuration(
sources_storage: FileStorage, source_name: str
) -> SourceConfiguration:
Expand Down
4 changes: 0 additions & 4 deletions dlt/sources/init/__init__.py

This file was deleted.

File renamed without changes.
File renamed without changes.
Empty file.
Loading

0 comments on commit d6b70bc

Please sign in to comment.