Skip to content

Commit

Permalink
removes dlt init flag to skip core sources, adds flag to eject core s…
Browse files Browse the repository at this point in the history
…ource
  • Loading branch information
rudolfix committed Dec 16, 2024
1 parent d81a35b commit c0c4f04
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 59 deletions.
4 changes: 2 additions & 2 deletions dlt/cli/command_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ def init_command_wrapper(
destination_type: str,
repo_location: str,
branch: str,
omit_core_sources: bool = False,
eject_source: bool = False,
) -> None:
init_command(
source_name,
destination_type,
repo_location,
branch,
omit_core_sources,
eject_source,
)


Expand Down
31 changes: 18 additions & 13 deletions dlt/cli/init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def _list_core_sources() -> Dict[str, SourceConfiguration]:
sources: Dict[str, SourceConfiguration] = {}
for source_name in files_ops.get_sources_names(core_sources_storage, source_type="core"):
sources[source_name] = files_ops.get_core_source_configuration(
core_sources_storage, source_name
core_sources_storage, source_name, eject_source=False
)
return sources

Expand Down Expand Up @@ -295,7 +295,7 @@ def init_command(
destination_type: str,
repo_location: str,
branch: str = None,
omit_core_sources: bool = False,
eject_source: bool = False,
) -> None:
# try to import the destination and get config spec
destination_reference = Destination.from_reference(destination_type)
Expand All @@ -310,13 +310,9 @@ def init_command(

# discover type of source
source_type: files_ops.TSourceType = "template"
if (
source_name in files_ops.get_sources_names(core_sources_storage, source_type="core")
) and not omit_core_sources:
if source_name in files_ops.get_sources_names(core_sources_storage, source_type="core"):
source_type = "core"
else:
if omit_core_sources:
fmt.echo("Omitting dlt core sources.")
verified_sources_storage = _clone_and_get_verified_sources_storage(repo_location, branch)
if source_name in files_ops.get_sources_names(
verified_sources_storage, source_type="verified"
Expand Down Expand Up @@ -380,7 +376,7 @@ def init_command(
else:
if source_type == "core":
source_configuration = files_ops.get_core_source_configuration(
core_sources_storage, source_name
core_sources_storage, source_name, eject_source
)
from importlib.metadata import Distribution

Expand All @@ -392,6 +388,9 @@ def init_command(

if canonical_source_name in extras:
source_configuration.requirements.update_dlt_extras(canonical_source_name)

# create remote modified index to copy files when ejecting
remote_modified = {file_name: None for file_name in source_configuration.files}
else:
if not is_valid_schema_name(source_name):
raise InvalidSchemaName(source_name)
Expand Down Expand Up @@ -536,11 +535,17 @@ def init_command(
"Creating a new pipeline with the dlt core source %s (%s)"
% (fmt.bold(source_name), source_configuration.doc)
)
fmt.echo(
"NOTE: Beginning with dlt 1.0.0, the source %s will no longer be copied from the"
" verified sources repo but imported from dlt.sources. You can provide the"
" --omit-core-sources flag to revert to the old behavior." % (fmt.bold(source_name))
)
if eject_source:
fmt.echo(
"NOTE: Source code of %s will be ejected. Remember to modify the pipeline "
"example script to import the ejected source." % (fmt.bold(source_name))
)
else:
fmt.echo(
"NOTE: Beginning with dlt 1.0.0, the source %s will no longer be copied from"
" the verified sources repo but imported from dlt.sources. You can provide the"
" --eject flag to revert to the old behavior." % (fmt.bold(source_name))
)
elif source_configuration.source_type == "verified":
fmt.echo(
"Creating and configuring a new pipeline with the verified source %s (%s)"
Expand Down
40 changes: 23 additions & 17 deletions dlt/cli/pipeline_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,19 +226,39 @@ def get_template_configuration(
)


def _get_source_files(sources_storage: FileStorage, source_name: str) -> List[str]:
"""Get all files that belong to source `source_name`"""
files: List[str] = []
for root, subdirs, _files in os.walk(sources_storage.make_full_path(source_name)):
# filter unwanted files
for subdir in list(subdirs):
if any(fnmatch.fnmatch(subdir, ignore) for ignore in IGNORE_FILES):
subdirs.remove(subdir)
rel_root = sources_storage.to_relative_path(root)
files.extend(
[
os.path.join(rel_root, file)
for file in _files
if all(not fnmatch.fnmatch(file, ignore) for ignore in IGNORE_FILES)
]
)
return files


def get_core_source_configuration(
sources_storage: FileStorage, source_name: str
sources_storage: FileStorage, source_name: str, eject_source: bool
) -> SourceConfiguration:
src_pipeline_file = CORE_SOURCE_TEMPLATE_MODULE_NAME + "/" + source_name + PIPELINE_FILE_SUFFIX
dest_pipeline_file = source_name + PIPELINE_FILE_SUFFIX
files: List[str] = _get_source_files(sources_storage, source_name) if eject_source else []

return SourceConfiguration(
"core",
"dlt.sources." + source_name,
sources_storage,
src_pipeline_file,
dest_pipeline_file,
[".gitignore"],
files,
SourceRequirements([]),
_get_docstring_for_module(sources_storage, source_name),
False,
Expand All @@ -259,21 +279,7 @@ def get_verified_source_configuration(
f"Pipeline example script {example_script} could not be found in the repository",
source_name,
)
# get all files recursively
files: List[str] = []
for root, subdirs, _files in os.walk(sources_storage.make_full_path(source_name)):
# filter unwanted files
for subdir in list(subdirs):
if any(fnmatch.fnmatch(subdir, ignore) for ignore in IGNORE_FILES):
subdirs.remove(subdir)
rel_root = sources_storage.to_relative_path(root)
files.extend(
[
os.path.join(rel_root, file)
for file in _files
if all(not fnmatch.fnmatch(file, ignore) for ignore in IGNORE_FILES)
]
)
files = _get_source_files(sources_storage, source_name)
# read requirements
requirements_path = os.path.join(source_name, utils.REQUIREMENTS_TXT)
if sources_storage.has_file(requirements_path):
Expand Down
10 changes: 3 additions & 7 deletions dlt/cli/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,10 @@ def configure_parser(self, parser: argparse.ArgumentParser) -> None:
)

parser.add_argument(
"--omit-core-sources",
"--eject",
default=False,
action="store_true",
help=(
"When present, will not create the new pipeline with a core source of the given"
" name but will take a source of this name from the default or provided"
" location."
),
help="Ejects the source code of the core source like sql_database",
)

def execute(self, args: argparse.Namespace) -> None:
Expand All @@ -107,7 +103,7 @@ def execute(self, args: argparse.Namespace) -> None:
args.destination,
args.location,
args.branch,
args.omit_core_sources,
args.eject,
)


Expand Down
13 changes: 13 additions & 0 deletions docs/website/docs/reference/command-line-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,22 @@ This command creates a new dlt pipeline script that loads data from `source` to
This command can be used several times in the same folder to add more sources, destinations, and pipelines. It will also update the verified source code to the newest
version if run again with an existing `source` name. You are warned if files will be overwritten or if the `dlt` version needs an upgrade to run a particular pipeline.

### Ejecting source code of the core sources like `sql_database`.
We merged a few sources to the core library. You can still eject source code and hack them with the `--eject` flag:
```sh
dlt init sql_database duckdb --eject
```
will copy the source code of `sql_database` to your project. Remember to modify the pipeline example script to import from the local folder!

### Specify your own "verified sources" repository
You can use the `--location <repo_url or local folder>` option to specify your own repository with sources. Typically, you would [fork ours](https://github.com/dlt-hub/verified-sources) and start customizing and adding sources, e.g., to use them for your team or organization. You can also specify a branch with `--branch <name>`, e.g., to test a version being developed.

### Using dlt 0.5.x sources
Use `--branch 0.5` if you are still on `dlt` `0.5.x` ie.
```sh
dlt init <source> <destination> --branch 0.5
```

### List all sources
```sh
dlt init --list-sources
Expand Down
34 changes: 15 additions & 19 deletions tests/cli/test_init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,21 @@ def test_init_all_sources_isolated(cloned_init_repo: FileStorage) -> None:
assert_index_version_constraint(files, candidate)


def test_init_core_sources_ejected(cloned_init_repo: FileStorage) -> None:
repo_dir = get_repo_dir(cloned_init_repo)
# ensure we test both sources form verified sources and core sources
source_candidates = set(CORE_SOURCES)
for candidate in source_candidates:
clean_test_storage()
repo_dir = get_repo_dir(cloned_init_repo)
files = get_project_files(clear_all_sources=False)
with set_working_dir(files.storage_path):
init_command.init_command(candidate, "bigquery", repo_dir, eject_source=True)
assert_requirements_txt(files, "bigquery")
# check if files copied
assert files.has_folder(candidate)


@pytest.mark.parametrize("destination_name", IMPLEMENTED_DESTINATIONS)
def test_init_all_destinations(
destination_name: str, project_files: FileStorage, repo_dir: str
Expand All @@ -279,25 +294,6 @@ def test_custom_destination_note(repo_dir: str, project_files: FileStorage):
assert "to add a destination function that will consume your data" in _out


@pytest.mark.parametrize("omit", [True, False])
# this will break if we have new core sources that are not in verified sources anymore
@pytest.mark.parametrize("source", set(CORE_SOURCES) - {"rest_api"})
def test_omit_core_sources(
source: str, omit: bool, project_files: FileStorage, repo_dir: str
) -> None:
with io.StringIO() as buf, contextlib.redirect_stdout(buf):
init_command.init_command(source, "destination", repo_dir, omit_core_sources=omit)
_out = buf.getvalue()

# check messaging
assert ("Omitting dlt core sources" in _out) == omit
assert ("will no longer be copied from the" in _out) == (not omit)

# if we omit core sources, there will be a folder with the name of the source from the verified sources repo
assert project_files.has_folder(source) == omit
assert (f"dlt.sources.{source}" in project_files.load(f"{source}_pipeline.py")) == (not omit)


def test_init_code_update_index_diff(repo_dir: str, project_files: FileStorage) -> None:
sources_storage = FileStorage(os.path.join(repo_dir, SOURCES_MODULE_NAME))
new_content = '"""New docstrings"""'
Expand Down
1 change: 0 additions & 1 deletion tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3101,7 +3101,6 @@ def conflict():
p.run([source_2().with_resources("conflict")])
counts = load_table_counts(p, "conflict")
assert counts == {"conflict": 2}
print(p.dataset().conflict.df())


def test_many_pipelines_single_dataset() -> None:
Expand Down

0 comments on commit c0c4f04

Please sign in to comment.