From c5a2bf3d875a87aa4f99b1a2b137e5ac5efff094 Mon Sep 17 00:00:00 2001 From: Tobias Persson Date: Mon, 2 Dec 2024 13:55:17 +0100 Subject: [PATCH] Get environments from Kubernetes --- .../src/etos_suite_runner/esr.py | 8 +- .../etos_suite_runner/lib/esr_parameters.py | 2 +- .../src/etos_suite_runner/lib/executor.py | 2 +- .../src/etos_suite_runner/lib/suite.py | 86 +++++++++++++++---- 4 files changed, 77 insertions(+), 21 deletions(-) diff --git a/projects/etos_suite_runner/src/etos_suite_runner/esr.py b/projects/etos_suite_runner/src/etos_suite_runner/esr.py index 613bb96..806e16a 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/esr.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/esr.py @@ -110,7 +110,7 @@ def __environment_request_status(self) -> None: reason = condition.get("reason", "").lower() if status == "false" and reason == "failed": failed.append(condition) - if status == "false" and reason == "done": + if status == "true" and reason == "done": success.append(condition) if found and len(failed) > 0: for condition in failed: @@ -122,14 +122,13 @@ def __environment_request_status(self) -> None: ) break if found and len(success) == len(requests): - self.params.set_status( - "SUCCESS", "Successfully created an environment for test" - ) + self.params.set_status("SUCCESS", None) self.logger.info( "Environment provider has finished creating an environment for test.", extra={"user_log": True}, ) break + self.logger.info("Environmentrequest finished") def __request_environment(self, ids: list[str]) -> None: """Request an environment from the environment provider. @@ -175,6 +174,7 @@ def _request_environment(self, ids: list[str], otel_context_carrier: dict) -> No runners. :param otel_context_carrier: a dict carrying current OpenTelemetry context. """ + FORMAT_CONFIG.identifier = self.params.testrun_id # OpenTelemetry contexts aren't propagated to threads automatically. # For this reason otel_context needs to be reinstantiated due to # this method running in a separate thread. diff --git a/projects/etos_suite_runner/src/etos_suite_runner/lib/esr_parameters.py b/projects/etos_suite_runner/src/etos_suite_runner/lib/esr_parameters.py index f6e1fb3..b2de9c9 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/lib/esr_parameters.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/lib/esr_parameters.py @@ -44,7 +44,7 @@ def __init__(self, etos: ETOS) -> None: self.issuer = {"name": "ETOS Suite Runner"} self.environment_status = {"status": "NOT_STARTED", "error": None} - def set_status(self, status: str, error: str) -> None: + def set_status(self, status: str, error: Optional[str] = None) -> None: """Set environment provider status.""" with self.lock: self.logger.debug("Setting environment status to %r, error %r", status, error) diff --git a/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py b/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py index 4210264..186052f 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/lib/executor.py @@ -64,7 +64,7 @@ def __decrypt(self, password: Union[str, dict]) -> str: self.logger.debug("No encryption key available, won't decrypt password") return password password_value = password.get("$decrypt", {}).get("value") - if password_value is None: + if password_value is None or password_value == "": self.logger.debug("No '$decrypt' JSONTas struct for password, won't decrypt password") return password return Fernet(key).decrypt(password_value).decode() diff --git a/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py b/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py index 3c37f3a..25c053b 100644 --- a/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py +++ b/projects/etos_suite_runner/src/etos_suite_runner/lib/suite.py @@ -21,7 +21,7 @@ import time from typing import Iterator, Union -from eiffellib.events import EiffelTestSuiteStartedEvent +from eiffellib.events import EiffelTestSuiteStartedEvent, EiffelEnvironmentDefinedEvent from environment_provider.lib.registry import ProviderRegistry from environment_provider.environment import release_environment from etos_lib import ETOS @@ -268,16 +268,11 @@ def __del__(self): """Destructor.""" opentelemetry.context.detach(self.otel_context_token) - @property - def sub_suite_environments(self) -> Iterator[dict]: + def _sub_suite_environments_from_eiffel(self) -> Iterator[dict]: """All sub suite environments from the environment provider. Each sub suite environment is an environment for the sub suites to execute in. """ - self.logger.debug( - "Start collecting sub suite definitions (timeout=%ds).", - self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT"), - ) environments = [] timeout = time.time() + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT") while time.time() < timeout: @@ -300,7 +295,13 @@ def sub_suite_environments(self) -> Iterator[dict]: ): if environment["meta"]["id"] not in environments: environments.append(environment["meta"]["id"]) - yield environment + sub_suite_definition = self._download_sub_suite(environment) + if sub_suite_definition is None: + raise EnvironmentProviderException( + "URL to sub suite is missing", self.etos.config.get("task_id") + ) + sub_suite_definition["id"] = environment["meta"]["id"] + yield sub_suite_definition if activity_finished is not None: if activity_finished["data"]["activityOutcome"]["conclusion"] != "SUCCESSFUL": exc = EnvironmentProviderException( @@ -318,6 +319,67 @@ def sub_suite_environments(self) -> Iterator[dict]: self._record_exception(exc) raise exc + def _sub_suite_environments_from_kubernetes(self) -> Iterator[dict]: + """All sub suite environments from Kubernetes. + + Each sub suite environment is an environment for the sub suites to execute in. + """ + environments = [] + timeout = time.time() + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT") + while time.time() < timeout: + time.sleep(5) + for environment in self.params.environments: + if environment.spec.sub_suite_id in environments: + continue + + # Send eiffel event for the ETR. + event = EiffelEnvironmentDefinedEvent() + event.meta.event_id = environment.metadata.name + url = f"{os.getenv('ETOS_API')}/v1alpha/testrun/{environment.metadata.name}" + self.etos.events.send( + event, + {"CONTEXT": self.test_suite_started_id}, + {"name": environment.spec.name, "uri": url}, + ) + + environments.append(environment.spec.sub_suite_id) + executor = environment.spec.executor + yield { + "executor": executor, + "id": environment.spec.sub_suite_id, + "name": environment.spec.name, + } + status = self.params.get_status() + if status.get("status") == "FAILURE": + exc = EnvironmentProviderException( + status.get("error"), self.etos.config.get("task_id") + ) + self._record_exception(exc) + raise exc + if status.get("status") == "SUCCESS" and len(environments) > 0: + return + else: # pylint:disable=useless-else-on-loop + exc = TimeoutError( + f"Timed out after {self.etos.config.get('WAIT_FOR_ENVIRONMENT_TIMEOUT')} seconds." + ) + self._record_exception(exc) + raise exc + + @property + def sub_suite_environments(self) -> Iterator[dict]: + """All sub suite environments from the environment provider. + + Each sub suite environment is an environment for the sub suites to execute in. + """ + self.logger.debug( + "Start collecting sub suite definitions (timeout=%ds).", + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT"), + ) + if os.getenv("IDENTIFIER") is not None: + yield from self._sub_suite_environments_from_kubernetes() + else: + yield from self._sub_suite_environments_from_eiffel() + @property def all_finished(self) -> bool: """Whether or not all sub suites are finished.""" @@ -400,16 +462,10 @@ def _start(self): self.test_suite_started.meta.event_id, extra={"user_log": True}, ) - for sub_suite_environment in self.sub_suite_environments: + for sub_suite_definition in self.sub_suite_environments: self.logger.info( "Environment received. Starting up a sub suite", extra={"user_log": True} ) - sub_suite_definition = self._download_sub_suite(sub_suite_environment) - if sub_suite_definition is None: - raise EnvironmentProviderException( - "URL to sub suite is missing", self.etos.config.get("task_id") - ) - sub_suite_definition["id"] = sub_suite_environment["meta"]["id"] sub_suite = SubSuite(self.etos, sub_suite_definition, self.test_suite_started_id) self.sub_suites.append(sub_suite) thread = threading.Thread(