Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize DbtVirtualenvBaseOperator: Single virtualenv per task execution #1200

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 37 additions & 32 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
)

if TYPE_CHECKING:
from airflow.utils.context import Context

from airflow.utils.context import Context # pragma: no cover
from dbt.cli.main import dbtRunnerResult # pragma: no cover

PY_INTERPRETER = "python3"
LOCK_FILENAME = "cosmos_virtualenv.lock"
Expand Down Expand Up @@ -77,49 +77,54 @@ def __init__(
self.virtualenv_dir = virtualenv_dir
self.is_virtualenv_dir_temporary = is_virtualenv_dir_temporary
self.max_retries_lock = settings.virtualenv_max_retries_lock
self._py_bin: str | None = None
super().__init__(**kwargs)
if not self.py_requirements:
self.log.error("Cosmos virtualenv operators require the `py_requirements` parameter")

def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult:
# No virtualenv_dir set, so create a temporary virtualenv
if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary:
self.log.info("Creating temporary virtualenv")
with TemporaryDirectory(prefix="cosmos-venv") as tempdir:
self.virtualenv_dir = Path(tempdir)
py_bin = self._prepare_virtualenv()
dbt_bin = str(Path(py_bin).parent / "dbt")
command[0] = dbt_bin # type: ignore
subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command(
command=command,
env=env,
cwd=cwd,
output_encoding=self.output_encoding,
)
return subprocess_result

# Use a reusable virtualenv
self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists")
while not self._is_lock_available() and self.max_retries_lock:
logger.info("Waiting for virtualenv lock to be released")
time.sleep(1)
self.max_retries_lock -= 1

self.log.info(f"Acquiring the virtualenv lock")
self._acquire_venv_lock()
py_bin = self._prepare_virtualenv()
dbt_bin = str(Path(py_bin).parent / "dbt")
command[0] = dbt_bin # type: ignore
self.log.info("Trying to run the command:\n %s\nFrom %s", command, cwd)
if self._py_bin is not None:
self.log.info(f"Using Python binary from virtualenv: {self._py_bin}")
command[0] = str(Path(self._py_bin).parent / "dbt")
subprocess_result = self.subprocess_hook.run_command(
command=command,
env=env,
cwd=cwd,
output_encoding=self.output_encoding,
)
self.log.info("Releasing virtualenv lock")
self._release_venv_lock()
self.log.info(subprocess_result.output)
return subprocess_result

def run_command(
self,
cmd: list[str],
env: dict[str, str | bytes | os.PathLike[Any]],
context: Context,
) -> FullOutputSubprocessResult | dbtRunnerResult:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this fail when dbt is not installed in the Airflow worker node?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your feedback!

This PR intends to install dbt within the virtual environment by running pip install. I have also added tests to verify that the pip install command is executed within the prepared virtual environment, so that dbt is available even if it's not pre-installed on the Airflow worker node.

# No virtualenv_dir set, so create a temporary virtualenv
if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary:
self.log.info("Creating temporary virtualenv")
with TemporaryDirectory(prefix="cosmos-venv") as tempdir:
self.virtualenv_dir = Path(tempdir)
self._py_bin = self._prepare_virtualenv()
return super().run_command(cmd, env, context)

try:
self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists")
while not self._is_lock_available() and self.max_retries_lock:
logger.info("Waiting for virtualenv lock to be released")
time.sleep(1)
self.max_retries_lock -= 1

self.log.info("Acquiring the virtualenv lock")
self._acquire_venv_lock()
self._py_bin = self._prepare_virtualenv()
return super().run_command(cmd, env, context)
finally:
self.log.info("Releasing virtualenv lock")
self._release_venv_lock()

def clean_dir_if_temporary(self) -> None:
"""
Delete the virtualenv directory if it is temporary.
Expand Down
40 changes: 29 additions & 11 deletions tests/operators/test_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,22 @@ def test_run_command_without_virtualenv_dir(
assert dbt_deps["command"][0] == dbt_cmd["command"][0]
assert dbt_deps["command"][1] == "deps"
assert dbt_cmd["command"][1] == "do-something"
assert mock_execute.call_count == 4


assert mock_execute.call_count == 2
virtualenv_call, pip_install_call = mock_execute.call_args_list
assert "python" in virtualenv_call[0][0][0]
assert virtualenv_call[0][0][1] == "-m"
assert virtualenv_call[0][0][2] == "virtualenv"
assert "pip" in pip_install_call[0][0][0]
assert pip_install_call[0][0][1] == "install"
cosmos_venv_dirs = [
f for f in os.listdir("/tmp") if os.path.isdir(os.path.join("/tmp", f)) and f.startswith("cosmos-venv")
]
assert len(cosmos_venv_dirs) == 0


@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available")
@patch("time.sleep")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock")
@patch("airflow.utils.python_virtualenv.execute_in_subprocess")
@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes")
@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql")
Expand All @@ -93,7 +106,12 @@ def test_run_command_with_virtualenv_dir(
mock_store_compiled_sql,
mock_calculate_openlineage_events_completes,
mock_execute,
mock_release_venv_lock,
mock_sleep,
mock_is_lock_available,
caplog,
):
mock_is_lock_available.side_effect = [False, False, True]
mock_get_connection.return_value = Connection(
conn_id="fake_conn",
conn_type="postgres",
Expand Down Expand Up @@ -124,6 +142,12 @@ def test_run_command_with_virtualenv_dir(
dbt_cmd = run_command_args[1].kwargs
assert dbt_deps["command"][0] == "mock-venv/bin/dbt"
assert dbt_cmd["command"][0] == "mock-venv/bin/dbt"
assert caplog.text.count("Waiting for virtualenv lock to be released") == 2
assert mock_sleep.call_count == 2
assert mock_is_lock_available.call_count == 3
assert mock_release_venv_lock.call_count == 1
cosmos_venv_dirs = [f for f in os.listdir() if f == "mock-venv"]
assert len(cosmos_venv_dirs) == 1


def test_virtualenv_operator_append_env_is_true_by_default():
Expand Down Expand Up @@ -184,13 +208,7 @@ def test_on_kill(mock_clean_dir_if_temporary):


@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator.subprocess_hook")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._prepare_virtualenv")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._acquire_venv_lock")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available", side_effect=[False, False, True])
def test_run_subprocess_waits_for_lock(
mock_is_lock_available, mock_acquire, mock_prepare, mock_release, mock_subprocess_hook, tmpdir, caplog
):
def test_run_subprocess(mock_subprocess_hook, tmpdir, caplog):
venv_operator = ConcreteDbtVirtualenvBaseOperator(
profile_config=profile_config,
project_dir="./dev/dags/dbt/jaffle_shop",
Expand All @@ -199,7 +217,7 @@ def test_run_subprocess_waits_for_lock(
virtualenv_dir=tmpdir,
)
venv_operator.run_subprocess(["dbt", "run"], {}, "./dev/dags/dbt/jaffle_shop")
assert caplog.text.count("Waiting for virtualenv lock to be released") == 2
assert len(mock_subprocess_hook.run_command.call_args_list) == 1


@patch("cosmos.operators.local.DbtLocalBaseOperator.execute", side_effect=ValueError)
Expand Down
Loading