From e42aeb0da26a3610384fff63c9f6a33e1de37a65 Mon Sep 17 00:00:00 2001 From: kesompochy Date: Mon, 9 Sep 2024 09:40:12 +0900 Subject: [PATCH 1/7] Optimize DbtVirtualenvBaseOperator: Implement virtualenv reuse - Reuse virtualenv in single task execution to reduce creation overhead - Improve temporary directory management to use TemporaryDirectory when virtualenv_dir is set to None --- cosmos/operators/virtualenv.py | 67 ++++++++++++++++-------------- tests/operators/test_virtualenv.py | 23 ++++++---- 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index c8b31f6af..1a692c172 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -29,7 +29,7 @@ if TYPE_CHECKING: from airflow.utils.context import Context - + from dbt.cli.main import dbtRunnerResult PY_INTERPRETER = "python3" LOCK_FILENAME = "cosmos_virtualenv.lock" @@ -77,49 +77,54 @@ def __init__( self.virtualenv_dir = virtualenv_dir self.is_virtualenv_dir_temporary = is_virtualenv_dir_temporary self.max_retries_lock = settings.virtualenv_max_retries_lock + self._py_bin = None super().__init__(**kwargs) if not self.py_requirements: self.log.error("Cosmos virtualenv operators require the `py_requirements` parameter") def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult: - # No virtualenv_dir set, so create a temporary virtualenv - if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary: - self.log.info("Creating temporary virtualenv") - with TemporaryDirectory(prefix="cosmos-venv") as tempdir: - self.virtualenv_dir = Path(tempdir) - py_bin = self._prepare_virtualenv() - dbt_bin = str(Path(py_bin).parent / "dbt") - command[0] = dbt_bin # type: ignore - subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command( - command=command, - env=env, - cwd=cwd, - output_encoding=self.output_encoding, - ) - return subprocess_result - - # Use a reusable virtualenv - self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists") - while not self._is_lock_available() and self.max_retries_lock: - logger.info("Waiting for virtualenv lock to be released") - time.sleep(1) - self.max_retries_lock -= 1 - - self.log.info(f"Acquiring the virtualenv lock") - self._acquire_venv_lock() - py_bin = self._prepare_virtualenv() - dbt_bin = str(Path(py_bin).parent / "dbt") - command[0] = dbt_bin # type: ignore + self.log.info("Trying to run the command:\n %s\nFrom %s", command, cwd) + if self._py_bin is not None: + self.log.info(f"Using Python binary from virtualenv: {self._py_bin}") + command[0] = str(Path(self._py_bin).parent / "dbt") subprocess_result = self.subprocess_hook.run_command( command=command, env=env, cwd=cwd, output_encoding=self.output_encoding, ) - self.log.info("Releasing virtualenv lock") - self._release_venv_lock() + self.log.info(subprocess_result.output) return subprocess_result + def run_command( + self, + cmd: list[str], + env: dict[str, str | bytes | os.PathLike[Any]], + context: Context, + ) -> FullOutputSubprocessResult | dbtRunnerResult: + # No virtualenv_dir set, so create a temporary virtualenv + if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary: + self.log.info("Creating temporary virtualenv") + with TemporaryDirectory(prefix="cosmos-venv") as tempdir: + self.virtualenv_dir = Path(tempdir) + self._py_bin = self._prepare_virtualenv() + return super().run_command(cmd, env, context) + + try: + self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists") + while not self._is_lock_available() and self.max_retries_lock: + logger.info("Waiting for virtualenv lock to be released") + time.sleep(1) + self.max_retries_lock -= 1 + + self.log.info("Acquiring the virtualenv lock") + self._acquire_venv_lock() + self._py_bin = self._prepare_virtualenv() + return super().run_command(cmd, env, context) + finally: + self.log.info("Releasing virtualenv lock") + self._release_venv_lock() + def clean_dir_if_temporary(self) -> None: """ Delete the virtualenv directory if it is temporary. diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 0a7128626..76cb71bf6 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -77,9 +77,16 @@ def test_run_command_without_virtualenv_dir( assert dbt_deps["command"][0] == dbt_cmd["command"][0] assert dbt_deps["command"][1] == "deps" assert dbt_cmd["command"][1] == "do-something" - assert mock_execute.call_count == 4 + assert mock_execute.call_count == 2 + virtualenv_call, pip_install_call = mock_execute.call_args_list + assert "python" in virtualenv_call[0][0][0] + assert virtualenv_call[0][0][1] == "-m" + assert virtualenv_call[0][0][2] == "virtualenv" + assert "pip" in pip_install_call[0][0][0] + assert pip_install_call[0][0][1] == "install" +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock") @patch("airflow.utils.python_virtualenv.execute_in_subprocess") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql") @@ -93,6 +100,8 @@ def test_run_command_with_virtualenv_dir( mock_store_compiled_sql, mock_calculate_openlineage_events_completes, mock_execute, + mock_release_venv_lock, + caplog, ): mock_get_connection.return_value = Connection( conn_id="fake_conn", @@ -124,6 +133,8 @@ def test_run_command_with_virtualenv_dir( dbt_cmd = run_command_args[1].kwargs assert dbt_deps["command"][0] == "mock-venv/bin/dbt" assert dbt_cmd["command"][0] == "mock-venv/bin/dbt" + caplog.text.count("Waiting for virtualenv lock to be released") == 2 + assert mock_release_venv_lock.called_once() def test_virtualenv_operator_append_env_is_true_by_default(): @@ -184,12 +195,8 @@ def test_on_kill(mock_clean_dir_if_temporary): @patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator.subprocess_hook") -@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock") -@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._prepare_virtualenv") -@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._acquire_venv_lock") -@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available", side_effect=[False, False, True]) -def test_run_subprocess_waits_for_lock( - mock_is_lock_available, mock_acquire, mock_prepare, mock_release, mock_subprocess_hook, tmpdir, caplog +def test_run_subprocess( + mock_subprocess_hook, tmpdir, caplog ): venv_operator = ConcreteDbtVirtualenvBaseOperator( profile_config=profile_config, @@ -199,7 +206,7 @@ def test_run_subprocess_waits_for_lock( virtualenv_dir=tmpdir, ) venv_operator.run_subprocess(["dbt", "run"], {}, "./dev/dags/dbt/jaffle_shop") - assert caplog.text.count("Waiting for virtualenv lock to be released") == 2 + assert len(mock_subprocess_hook.run_command.call_args_list) == 1 @patch("cosmos.operators.local.DbtLocalBaseOperator.execute", side_effect=ValueError) From a84d16c74c7b0a92f6ac2940f4f316d92e2dd420 Mon Sep 17 00:00:00 2001 From: kesompochy Date: Mon, 9 Sep 2024 09:42:43 +0900 Subject: [PATCH 2/7] Add test to ensure virtualenv directory will be deleted or persist Related to #958 --- tests/operators/test_virtualenv.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 76cb71bf6..995039dd3 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -84,6 +84,10 @@ def test_run_command_without_virtualenv_dir( assert virtualenv_call[0][0][2] == "virtualenv" assert "pip" in pip_install_call[0][0][0] assert pip_install_call[0][0][1] == "install" + cosmos_venv_dirs = [ + f for f in os.listdir("/tmp") if os.path.isdir(os.path.join("/tmp", f)) and f.startswith("cosmos-venv") + ] + assert len(cosmos_venv_dirs) == 0 @patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock") @@ -135,6 +139,10 @@ def test_run_command_with_virtualenv_dir( assert dbt_cmd["command"][0] == "mock-venv/bin/dbt" caplog.text.count("Waiting for virtualenv lock to be released") == 2 assert mock_release_venv_lock.called_once() + cosmos_venv_dirs = [ + f for f in os.listdir() if f == "mock-venv" + ] + assert len(cosmos_venv_dirs) == 1 def test_virtualenv_operator_append_env_is_true_by_default(): From acba2e6b0b10107b6579fdf53bd64662b0695673 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 9 Sep 2024 01:04:26 +0000 Subject: [PATCH 3/7] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_virtualenv.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 995039dd3..f8e15604f 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -139,9 +139,7 @@ def test_run_command_with_virtualenv_dir( assert dbt_cmd["command"][0] == "mock-venv/bin/dbt" caplog.text.count("Waiting for virtualenv lock to be released") == 2 assert mock_release_venv_lock.called_once() - cosmos_venv_dirs = [ - f for f in os.listdir() if f == "mock-venv" - ] + cosmos_venv_dirs = [f for f in os.listdir() if f == "mock-venv"] assert len(cosmos_venv_dirs) == 1 @@ -203,9 +201,7 @@ def test_on_kill(mock_clean_dir_if_temporary): @patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator.subprocess_hook") -def test_run_subprocess( - mock_subprocess_hook, tmpdir, caplog -): +def test_run_subprocess(mock_subprocess_hook, tmpdir, caplog): venv_operator = ConcreteDbtVirtualenvBaseOperator( profile_config=profile_config, project_dir="./dev/dags/dbt/jaffle_shop", From f70c8a11c1e88e009b291bb64fa6f71a64d33e01 Mon Sep 17 00:00:00 2001 From: kesompochy Date: Mon, 9 Sep 2024 10:18:43 +0900 Subject: [PATCH 4/7] Fix type annotation for self._py_bin in DbtVirtualenvBaseOperator --- cosmos/operators/virtualenv.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 1a692c172..524fce2d2 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -77,7 +77,7 @@ def __init__( self.virtualenv_dir = virtualenv_dir self.is_virtualenv_dir_temporary = is_virtualenv_dir_temporary self.max_retries_lock = settings.virtualenv_max_retries_lock - self._py_bin = None + self._py_bin: str | None = None super().__init__(**kwargs) if not self.py_requirements: self.log.error("Cosmos virtualenv operators require the `py_requirements` parameter") From 7ad033b4e1912386ea4f95552db1a8b1d7619bd1 Mon Sep 17 00:00:00 2001 From: kesompochy Date: Mon, 9 Sep 2024 10:19:30 +0900 Subject: [PATCH 5/7] Fix assertion in test_run_command_with_virtualenv_dir --- tests/operators/test_virtualenv.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 995039dd3..46135fae3 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -138,10 +138,8 @@ def test_run_command_with_virtualenv_dir( assert dbt_deps["command"][0] == "mock-venv/bin/dbt" assert dbt_cmd["command"][0] == "mock-venv/bin/dbt" caplog.text.count("Waiting for virtualenv lock to be released") == 2 - assert mock_release_venv_lock.called_once() - cosmos_venv_dirs = [ - f for f in os.listdir() if f == "mock-venv" - ] + assert mock_release_venv_lock.call_count == 1 + cosmos_venv_dirs = [f for f in os.listdir() if f == "mock-venv"] assert len(cosmos_venv_dirs) == 1 @@ -203,9 +201,7 @@ def test_on_kill(mock_clean_dir_if_temporary): @patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator.subprocess_hook") -def test_run_subprocess( - mock_subprocess_hook, tmpdir, caplog -): +def test_run_subprocess(mock_subprocess_hook, tmpdir, caplog): venv_operator = ConcreteDbtVirtualenvBaseOperator( profile_config=profile_config, project_dir="./dev/dags/dbt/jaffle_shop", From 6dc921530439ad4028cf466aabe4da94f8af17b1 Mon Sep 17 00:00:00 2001 From: kesompochy Date: Tue, 24 Sep 2024 19:05:55 +0900 Subject: [PATCH 6/7] Exclude TYPE_CHECKING imports from coverage reporting for accuracy --- cosmos/operators/virtualenv.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 524fce2d2..ebc8bcf3b 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -28,8 +28,8 @@ ) if TYPE_CHECKING: - from airflow.utils.context import Context - from dbt.cli.main import dbtRunnerResult + from airflow.utils.context import Context # pragma: no cover + from dbt.cli.main import dbtRunnerResult # pragma: no cover PY_INTERPRETER = "python3" LOCK_FILENAME = "cosmos_virtualenv.lock" From 3f82e5931baf801f73e89451d39db582e3aa3aeb Mon Sep 17 00:00:00 2001 From: kesompochy Date: Tue, 24 Sep 2024 19:08:25 +0900 Subject: [PATCH 7/7] Add tests for DbtVirtualenvBaseOperator lock mechanism --- tests/operators/test_virtualenv.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 46135fae3..f0e7c1a22 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -90,6 +90,8 @@ def test_run_command_without_virtualenv_dir( assert len(cosmos_venv_dirs) == 0 +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available") +@patch("time.sleep") @patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock") @patch("airflow.utils.python_virtualenv.execute_in_subprocess") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes") @@ -105,8 +107,11 @@ def test_run_command_with_virtualenv_dir( mock_calculate_openlineage_events_completes, mock_execute, mock_release_venv_lock, + mock_sleep, + mock_is_lock_available, caplog, ): + mock_is_lock_available.side_effect = [False, False, True] mock_get_connection.return_value = Connection( conn_id="fake_conn", conn_type="postgres", @@ -137,7 +142,9 @@ def test_run_command_with_virtualenv_dir( dbt_cmd = run_command_args[1].kwargs assert dbt_deps["command"][0] == "mock-venv/bin/dbt" assert dbt_cmd["command"][0] == "mock-venv/bin/dbt" - caplog.text.count("Waiting for virtualenv lock to be released") == 2 + assert caplog.text.count("Waiting for virtualenv lock to be released") == 2 + assert mock_sleep.call_count == 2 + assert mock_is_lock_available.call_count == 3 assert mock_release_venv_lock.call_count == 1 cosmos_venv_dirs = [f for f in os.listdir() if f == "mock-venv"] assert len(cosmos_venv_dirs) == 1