diff --git a/src/aap_eda/services/activation/engine/kubernetes.py b/src/aap_eda/services/activation/engine/kubernetes.py index ed3a9ff8e..06955d9ce 100644 --- a/src/aap_eda/services/activation/engine/kubernetes.py +++ b/src/aap_eda/services/activation/engine/kubernetes.py @@ -199,9 +199,6 @@ def cleanup(self, container_id: str, log_handler: LogHandler) -> None: self._delete_services(log_handler) self._delete_job(log_handler) - self._wait_for_pod_to_be_deleted(container_id, log_handler) - log_handler.write(f"Job {container_id} is cleaned up.", flush=True) - def update_logs(self, container_id: str, log_handler: LogHandler) -> None: try: pod = self._get_job_pod(container_id) @@ -460,7 +457,7 @@ def _create_job( job_result = self.client.batch_api.create_namespaced_job( namespace=self.namespace, body=job_spec ) - LOGGER.info(f"Submitted Job template: {self.job_name},") + LOGGER.info(f"Submitted Job template: {self.job_name}") except ApiException as e: LOGGER.error(f"API Exception {e}") raise ContainerStartError(str(e)) from e @@ -485,11 +482,44 @@ def _delete_job(self, log_handler: LogHandler) -> None: if result.status == "Failure": raise ContainerCleanupError(f"{result}") + + watcher = watch.Watch() + try: + for event in watcher.stream( + self.client.core_api.list_namespaced_pod, + namespace=self.namespace, + label_selector=f"job-name={self.job_name}", + timeout_seconds=POD_DELETE_TIMEOUT, + ): + if event["type"] == "DELETED": + log_handler.write( + f"Pod '{self.job_name}' is deleted.", + flush=True, + ) + break + + log_handler.write( + f"Job {self.job_name} is cleaned up.", + flush=True, + ) + except ApiException as e: + if e.status == status.HTTP_404_NOT_FOUND: + message = ( + f"Pod '{self.job_name}' not found (404), " + "assuming it's already deleted." + ) + log_handler.write(message, flush=True) + return + log_handler.write( + f"Error while waiting for deletion: {e}", flush=True + ) + raise ContainerCleanupError( + f"Error during cleanup: {str(e)}" + ) from e + finally: + watcher.stop() else: LOGGER.info(f"Job for {self.job_name} has been removed") - log_handler.write( - f"Job for {self.job_name} has been removed.", True - ) except ApiException as e: raise ContainerCleanupError( @@ -532,50 +562,6 @@ def _wait_for_pod_to_start(self, log_handler: LogHandler) -> None: finally: watcher.stop() - def _wait_for_pod_to_be_deleted( - self, pod_name: str, log_handler: LogHandler - ) -> None: - log_handler.write( - f"Waiting for pod '{pod_name}' to be deleted...", - flush=True, - ) - watcher = watch.Watch() - try: - for event in watcher.stream( - self.client.core_api.list_namespaced_pod, - namespace=self.namespace, - label_selector=f"job-name={self.job_name}", - timeout_seconds=POD_DELETE_TIMEOUT, - ): - LOGGER.debug(f"Received event: {event}") - pod_name = event["object"].metadata.name - pod_phase = event["object"].status.phase - LOGGER.debug(f"Pod {pod_name} - {pod_phase}") - - if event["type"] == "DELETED": - # Pod successfully deleted - log_handler.write( - f"Pod '{pod_name}' has been deleted.", flush=True - ) - break - except ApiException as e: - if e.status == status.HTTP_404_NOT_FOUND: - message = ( - f"Pod '{pod_name}' not found (404), " - "assuming it's already deleted." - ) - log_handler.write(message, flush=True) - return - log_handler.write( - f"Error while waiting for deletion: {e}", flush=True - ) - raise ContainerCleanupError( - f"Error during cleanup: {str(e)}" - ) from e - - finally: - watcher.stop() - def _set_namespace(self) -> None: namespace_file = ( "/var/run/secrets/kubernetes.io/serviceaccount/namespace" diff --git a/tests/integration/services/activation/engine/test_kubernetes.py b/tests/integration/services/activation/engine/test_kubernetes.py index f784fbb10..2800e23b7 100644 --- a/tests/integration/services/activation/engine/test_kubernetes.py +++ b/tests/integration/services/activation/engine/test_kubernetes.py @@ -535,8 +535,27 @@ def raise_api_error(*args, **kwargs): engine._delete_services(log_handler) +@mock.patch("aap_eda.services.activation.engine.kubernetes.watch.Watch") @pytest.mark.django_db -def test_delete_job(init_kubernetes_data, kubernetes_engine): +def test_delete_job(mock_watch, init_kubernetes_data, kubernetes_engine): + # Setup mock watch stream + mock_watch_instance = mock.MagicMock() + mock_watch.return_value = mock_watch_instance + + # Simulate a DELETE event + mock_metadata = mock.MagicMock() + mock_metadata.name = "test-pod" + + mock_watch_instance.stream.return_value = [ + { + "type": "DELETED", + "object": mock.MagicMock( + metadata=mock_metadata, + status=mock.MagicMock(phase="Terminating"), + ), + } + ] + engine = kubernetes_engine job_name = "eda-job" engine.job_name = job_name @@ -556,7 +575,7 @@ def test_delete_job(init_kubernetes_data, kubernetes_engine): engine._delete_job(log_handler) assert models.RulebookProcessLog.objects.last().log.endswith( - f"Job for {job_name} has been removed." + f"Job {job_name} is cleaned up." ) @@ -587,95 +606,6 @@ def raise_api_error(*args, **kwargs): engine._delete_job(log_handler) -@mock.patch("aap_eda.services.activation.engine.kubernetes.watch.Watch") -@mock.patch( - "aap_eda.services.activation.engine.kubernetes.k8sclient.CoreV1Api" -) -@pytest.mark.django_db -def test_wait_for_pod_to_be_deleted_success( - mock_core_api, mock_watch, kubernetes_engine, mock_log_handler -): - # Setup mock watch stream - mock_watch_instance = mock.MagicMock() - mock_watch.return_value = mock_watch_instance - - # Simulate a DELETE event - mock_metadata = mock.MagicMock() - mock_metadata.name = "test-pod" - - mock_watch_instance.stream.return_value = [ - { - "type": "DELETED", - "object": mock.MagicMock( - metadata=mock_metadata, - status=mock.MagicMock(phase="Terminating"), - ), - } - ] - - kubernetes_engine._wait_for_pod_to_be_deleted("test-pod", mock_log_handler) - - # Verify log handler was called correctly - mock_log_handler.write.assert_any_call( - "Waiting for pod 'test-pod' to be deleted...", flush=True - ) - mock_log_handler.write.assert_any_call( - "Pod 'test-pod' has been deleted.", flush=True - ) - mock_watch_instance.stop.assert_called_once() - - -@mock.patch("aap_eda.services.activation.engine.kubernetes.watch.Watch") -@mock.patch( - "aap_eda.services.activation.engine.kubernetes.k8sclient.CoreV1Api" -) -@pytest.mark.django_db -def test_wait_for_pod_to_be_deleted_not_found( - mock_core_api, mock_watch, kubernetes_engine, mock_log_handler -): - # Setup mock to raise 404 error - mock_watch_instance = mock.MagicMock() - mock_watch.return_value = mock_watch_instance - mock_watch_instance.stream.side_effect = ApiException( - status=status.HTTP_404_NOT_FOUND - ) - - kubernetes_engine._wait_for_pod_to_be_deleted("test-pod", mock_log_handler) - - mock_log_handler.write.assert_any_call( - "Pod 'test-pod' not found (404), assuming it's already deleted.", - flush=True, - ) - mock_watch_instance.stop.assert_called_once() - - -@mock.patch("aap_eda.services.activation.engine.kubernetes.watch.Watch") -@mock.patch( - "aap_eda.services.activation.engine.kubernetes.k8sclient.CoreV1Api" -) -@pytest.mark.django_db -def test_wait_for_pod_to_be_deleted_error( - mock_core_api, mock_watch, kubernetes_engine, mock_log_handler -): - # Setup mock to raise other API error - mock_watch_instance = mock.MagicMock() - mock_watch.return_value = mock_watch_instance - mock_watch_instance.stream.side_effect = ApiException( - status=status.HTTP_500_INTERNAL_SERVER_ERROR, reason="test-failure" - ) - - with pytest.raises(ContainerCleanupError): - kubernetes_engine._wait_for_pod_to_be_deleted( - "test-pod", mock_log_handler - ) - - mock_log_handler.write.assert_any_call( - "Error while waiting for deletion: (500)\nReason: test-failure\n", - flush=True, - ) - mock_watch_instance.stop.assert_called_once() - - @pytest.mark.django_db def test_cleanup_orig(init_kubernetes_data, kubernetes_engine): engine = kubernetes_engine @@ -685,162 +615,7 @@ def test_cleanup_orig(init_kubernetes_data, kubernetes_engine): with mock.patch.object(engine, "_delete_secret") as secret_mock: with mock.patch.object(engine, "_delete_services") as services_mock: with mock.patch.object(engine, "_delete_job") as job_mock: - with mock.patch.object(engine, "_wait_for_pod_to_be_deleted"): - engine.cleanup(job_name, log_handler) - secret_mock.assert_called_once() - services_mock.assert_called_once() - job_mock.assert_called_once() - - assert models.RulebookProcessLog.objects.last().log.endswith( - f"Job {job_name} is cleaned up." - ) - - -@pytest.mark.django_db -def test_update_logs(init_kubernetes_data, kubernetes_engine): - engine = kubernetes_engine - log_handler = DBLogger(init_kubernetes_data.activation_instance.id) - init_log_read_at = init_kubernetes_data.activation_instance.log_read_at - job_name = "eda-job" - pod_mock = mock.Mock() - - with mock.patch.object( - engine, "_get_job_pod", mock.Mock(return_value=pod_mock) - ): - pod_mock.status.container_statuses = get_pod_statuses("running") - log_mock = mock.Mock() - message = "INFO Result is kept for 500 seconds" - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.read_namespaced_pod_log.return_value = log_mock - log_mock.splitlines.return_value = [ - ( - "2023-10-30T19:18:48.362883381Z 2023-10-30 19:18:48,362" - " INFO Task started: Monitor project tasks" - ), - ( - "2023-10-30T19:18:48.375144193Z 2023-10-30 19:18:48,374" - " INFO Task complete: Monitor project tasks" - ), - ( - "2023-10-30T19:18:48.376026733Z 2023-10-30 19:18:48,375" - " INFO default: Job OK (monitor_project_tasks)" - ), - f"2023-10-30T19:28:48.376034150Z {message}", - ] - engine.update_logs(job_name, log_handler) - - assert models.RulebookProcessLog.objects.last().log == f"{message}" - init_kubernetes_data.activation_instance.refresh_from_db() - assert ( - init_kubernetes_data.activation_instance.log_read_at - > init_log_read_at - ) - - def raise_api_error(*args, **kwargs): - raise ApiException("Container not found") - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.read_namespaced_pod_log.side_effect = raise_api_error - - with pytest.raises(ContainerUpdateLogsError): - engine.update_logs(job_name, log_handler) - - with mock.patch.object( - engine, "_get_job_pod", mock.Mock(return_value=pod_mock) - ): - pod_mock.status.container_statuses = get_pod_statuses("unknown") - log_mock = mock.Mock() - with mock.patch.object(engine.client, "core_api") as core_api_mock: - engine.update_logs(job_name, log_handler) - msg = f"Pod with label {job_name} has unhandled state:" - assert msg in models.RulebookProcessLog.objects.last().log - - -@pytest.mark.django_db -def test_get_job_pod(init_kubernetes_data, kubernetes_engine): - engine = kubernetes_engine - pods_mock = mock.Mock() - pod = get_pod("running") - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_pod.return_value = pods_mock - pods_mock.items = [pod] - - job_pod = engine._get_job_pod("eda-pod") - - assert job_pod is not None - assert job_pod == pod - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_pod.return_value = pods_mock - pods_mock.items = None - - with pytest.raises(ContainerNotFoundError): - engine._get_job_pod("eda-pod") - - def raise_api_error(*args, **kwargs): - raise ApiException("Container not found") - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_pod.side_effect = raise_api_error - - with pytest.raises(ContainerNotFoundError): - engine._get_job_pod("eda-pod") - - -@pytest.mark.django_db -def test_create_service( - init_kubernetes_data, - kubernetes_engine, - default_organization: models.Organization, -): - engine = kubernetes_engine - engine.job_name = "eda-job" - request = get_request( - init_kubernetes_data, - "admin", - default_organization, - k8s_service_name=init_kubernetes_data.activation.k8s_service_name, - ) - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_service.return_value.items = None - engine._create_service(request) - - core_api_mock.create_namespaced_service.assert_called_once() - - def raise_api_error(*args, **kwargs): - raise ApiException("Service not found") - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_service.side_effect = raise_api_error - - with pytest.raises(ContainerStartError): - engine._create_service(request) - - -@pytest.mark.django_db -def test_create_secret( - init_kubernetes_data, - kubernetes_engine, - default_organization: models.Organization, -): - engine = kubernetes_engine - engine.job_name = "eda-job" - request = get_request( - init_kubernetes_data, - "admin", - default_organization, - k8s_service_name=init_kubernetes_data.activation.k8s_service_name, - ) - log_handler = DBLogger(init_kubernetes_data.activation_instance.id) - - def raise_api_error(*args, **kwargs): - raise ApiException("Secret create error") - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.create_namespaced_secret.side_effect = raise_api_error - - with pytest.raises(ContainerStartError): - engine._create_secret(request, log_handler) - core_api_mock.delete_namespaced_secret.assert_called_once() + engine.cleanup(job_name, log_handler) + secret_mock.assert_called_once() + services_mock.assert_called_once() + job_mock.assert_called_once()