Skip to content

Commit

Permalink
Test happy path and try to attach to pipeline once runner exits
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 25, 2024
1 parent 01546e8 commit aaeea00
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 28 deletions.
9 changes: 7 additions & 2 deletions dlt/common/cli/runner/inquirer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def maybe_ask(self) -> t.Tuple[dlt.Pipeline, t.Union[DltResource, DltSource]]:
self.preflight_checks()
pipeline_name = self.get_pipeline_name()
pipeline = self.pipelines[pipeline_name]
real_pipeline_name = f" (Pipeline: {pipeline.pipeline_name})"

source_name = self.get_source_name()
resource = self.sources[source_name]
Expand All @@ -59,7 +58,13 @@ def maybe_ask(self) -> t.Tuple[dlt.Pipeline, t.Union[DltResource, DltSource]]:
else:
label = "Source"

real_source_name = f" ({label}: {resource.name})"
real_source_name = ""
if resource.name != source_name:
real_source_name = f" ({resource.name})"

real_pipeline_name = ""
if pipeline.pipeline_name != pipeline_name:
real_pipeline_name = f"({pipeline.pipeline_name})"

fmt.echo("\nPipeline: " + fmt.style(pipeline_name + real_pipeline_name, fg="blue"))

Expand Down
4 changes: 2 additions & 2 deletions dlt/common/cli/runner/pipeline_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def expect_no_pipeline_runs(self): # type: ignore[no-untyped-def]
"""Monkey patch pipeline.run during module loading
Restore it once importing is done
"""
old_run = dlt.Pipeline.run
original_run = dlt.Pipeline.run

def noop(*args, **kwargs) -> LoadInfo: # type: ignore
self.has_pipeline_auto_runs = True
Expand All @@ -80,7 +80,7 @@ def noop(*args, **kwargs) -> LoadInfo: # type: ignore

yield

dlt.Pipeline.run = old_run # type: ignore
dlt.Pipeline.run = original_run # type: ignore

@property
def pipeline_module(self) -> ModuleType:
Expand Down
12 changes: 6 additions & 6 deletions tests/cli/cases/cli_runner/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
@dlt.resource
def quads_resource():
for idx in range(10):
yield {"id": idx, "quad": idx**4}
yield {"id": idx, "num": idx**4}


@dlt.resource
def squares_resource():
def numbers_resource():
for idx in range(10):
yield {"id": idx, "square": idx * idx}
yield {"id": idx, "num": idx + 1}


@dlt.destination(loader_file_format="parquet")
Expand All @@ -19,17 +19,17 @@ def null_sink(_items, _table) -> None:


quads_resource_instance = quads_resource()
squares_resource_instance = squares_resource()
numbers_resource_instance = numbers_resource()

quads_pipeline = dlt.pipeline(
pipeline_name="numbers_quadruples_pipeline",
destination=null_sink,
)

squares_pipeline = dlt.pipeline(
numbers_pipeline = dlt.pipeline(
pipeline_name="numbers_pipeline",
destination="duckdb",
)

if __name__ == "__main__":
load_info = squares_pipeline.run(squares_resource(), schema=dlt.Schema("bobo-schema"))
load_info = numbers_pipeline.run(numbers_resource(), schema=dlt.Schema("bobo-schema"))
40 changes: 22 additions & 18 deletions tests/cli/test_run_command.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,45 @@
import io
import os
import contextlib

import shutil
from unittest import mock

import pytest
import dlt

from dlt.cli import run_command

from tests.utils import TESTS_ROOT
from dlt.common.utils import set_working_dir
from tests.utils import TEST_STORAGE_ROOT, TESTS_ROOT

RUNNER_PIPELINES = TESTS_ROOT / "cli/cases/cli_runner"
TEST_PIPELINE = RUNNER_PIPELINES / "pipeline.py"
TEST_PIPELINE_WITH_IMMEDIATE_RUN = RUNNER_PIPELINES / "pipeline_with_immediate_run.py"
TEST_PIPELINE_CONTENTS = open(RUNNER_PIPELINES / "pipeline.py").read().strip()

CLI_RUNNER_PIPELINES = TESTS_ROOT / "cli/cases/cli_runner"
TEST_PIPELINE = CLI_RUNNER_PIPELINES / "pipeline.py"
TEST_PIPELINE_WITH_IMMEDIATE_RUN = CLI_RUNNER_PIPELINES / "pipeline_with_immediate_run.py"

@pytest.fixture(scope="module")
def ch_pipeline_dir():
cwd = os.getcwd()
os.chdir(RUNNER_PIPELINES)
yield
os.chdir(cwd)

def test_run_command_happy_path_works_as_expected():
pipeline_name = "numbers_pipeline"
p = dlt.pipeline(pipeline_name=pipeline_name)
p._wipe_working_folder()
shutil.copytree(CLI_RUNNER_PIPELINES, TEST_STORAGE_ROOT, dirs_exist_ok=True)

def test_run_command_requires_working_directory_same_as_pipeline_working_directory():
with io.StringIO() as buf, contextlib.redirect_stdout(buf):
with io.StringIO() as buf, contextlib.redirect_stdout(buf), set_working_dir(TEST_STORAGE_ROOT):
run_command.run_pipeline_command(
str(TEST_PIPELINE),
"squares_pipeline",
"squares_resource_instance",
pipeline_name,
"numbers_resource_instance",
["write_disposition=merge", "loader_file_format=jsonl"],
)

output = buf.getvalue()
assert "Current working directory is different from the pipeline script" in output
assert "If needed please change your current directory to" in output
assert "Pipeline: numbers_pipeline" in output
assert "Resource: numbers_resource_instance (numbers_resource)" in output
assert "Pipeline numbers_pipeline load step completed" in output
assert "contains no failed jobs" in output

# Check if we can successfully attach to pipeline
dlt.attach(pipeline_name)


def test_run_command_fails_with_relevant_error_if_pipeline_resource_or_source_not_found():
Expand Down

0 comments on commit aaeea00

Please sign in to comment.