diff --git a/.github/workflows/e2e_test.yaml b/.github/workflows/e2e_test.yaml index 5933451ee..7d0383c12 100644 --- a/.github/workflows/e2e_test.yaml +++ b/.github/workflows/e2e_test.yaml @@ -2,6 +2,7 @@ name: End-to-End tests on: pull_request: + jobs: # test option values defined at test/conftest.py are passed on via repository secret diff --git a/.github/workflows/e2e_test_run.yaml b/.github/workflows/e2e_test_run.yaml index 1ecc238e1..4eeeccbe1 100644 --- a/.github/workflows/e2e_test_run.yaml +++ b/.github/workflows/e2e_test_run.yaml @@ -36,6 +36,8 @@ jobs: name: End-to-End Test Run runs-on: [self-hosted, linux, "${{ inputs.runner-tag }}"] steps: + - name: Hostname is set to "github-runner" + run: sudo hostnamectl hostname | grep github-runner # Snapd can have some issues in privileged LXD containers without setting # security.nesting=True and this. - name: Fix snap issue in privileged LXD containers diff --git a/.github/workflows/integration_test.yaml b/.github/workflows/integration_test.yaml index 107c87d42..6c968bd00 100644 --- a/.github/workflows/integration_test.yaml +++ b/.github/workflows/integration_test.yaml @@ -24,8 +24,8 @@ jobs: # # test_debug_ssh ensures tmate SSH actions works. # # TODO: Add OpenStack integration versions of these tests. # modules: '["test_charm_scheduled_events", "test_debug_ssh"]' - # openstack-integration-tests-private-endpoint-amd64: - # name: Integration test using private-endpoint(AMD64) + # openstack-interface-tests-private-endpoint: + # name: openstack interface test using private-endpoint # uses: canonical/operator-workflows/.github/workflows/integration_test.yaml@main # secrets: inherit # with: @@ -33,6 +33,19 @@ jobs: # pre-run-script: scripts/setup-lxd.sh # provider: lxd # test-tox-env: integration-juju3.2 + # modules: '["test_runner_manager_openstack"]' + # self-hosted-runner: true + # self-hosted-runner-label: stg-private-endpoint + # openstack-integration-tests-private-endpoint: + # name: Integration test using private-endpoint + # uses: canonical/operator-workflows/.github/workflows/integration_test.yaml@main + # needs: openstack-interface-tests-private-endpoint + # secrets: inherit + # with: + # juju-channel: 3.2/stable + # pre-run-script: scripts/setup-lxd.sh + # provider: lxd + # test-tox-env: integration-juju3.2 # modules: '["test_charm_metrics_failure", "test_charm_metrics_success", "test_charm_fork_repo", "test_charm_runner", "test_reactive"]' # extra-arguments: "-m openstack" # self-hosted-runner: true diff --git a/pyproject.toml b/pyproject.toml index e7d0f789f..f4a49bd2a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,9 @@ skips = ["*/*test.py", "*/test_*.py", "*tests/*.py"] [tool.coverage.run] branch = true omit = [ + # These are covered by `tests/integration/test_runner_manager_openstack.py`. + "src/openstack_cloud/openstack_cloud.py", + "src/openstack_cloud/openstack_runner_manager.py", # Contains interface for calling LXD. Tested in integration tests and end to end tests. "src/lxd.py", # Contains interface for calling repo policy compliance service. Tested in integration test @@ -54,9 +57,9 @@ max-doc-length = 99 max-complexity = 10 exclude = [".git", "__pycache__", ".tox", "build", "dist", "*.egg_info", "venv"] select = ["E", "W", "F", "C", "N", "R", "D", "H"] -# Ignore W503, E501 because using black creates errors with this +# Ignore W503, E501, E203 because using black creates errors with this # Ignore D107 Missing docstring in __init__ -ignore = ["W503", "E501", "D107"] +ignore = ["W503", "E501", "D107", "E203"] # D100, D101, D102, D103, D104: Ignore docstring style issues in tests # temporary disable E402 for the fix in charm.py for lp:2058335 per-file-ignores = ["src/charm.py:E402", "tests/*:D100,D101,D102,D103,D104,D205,D212"] diff --git a/src-docs/errors.md b/src-docs/errors.md index c0f190a73..cf7cde565 100644 --- a/src-docs/errors.md +++ b/src-docs/errors.md @@ -403,3 +403,25 @@ Represents an unauthorized connection to OpenStack. +--- + + + +## class `SSHError` +Represents an error while interacting with SSH. + + + + + +--- + + + +## class `KeyfileError` +Represents missing keyfile for SSH. + + + + + diff --git a/src-docs/managed_requests.md b/src-docs/managed_requests.md new file mode 100644 index 000000000..23939bcc2 --- /dev/null +++ b/src-docs/managed_requests.md @@ -0,0 +1,32 @@ + + + + +# module `managed_requests` +Get configured requests session instance + + +--- + + + +## function `get_requests_session` + +```python +get_requests_session(proxy: ProxyConfig) → Session +``` + +Get managed requests session instance. + + + +**Args:** + + - `proxy`: HTTP proxy configurations. + + + +**Returns:** + Requests session with proxy and retry setup. + + diff --git a/src-docs/metrics.runner.md b/src-docs/metrics.runner.md index edf8f0a65..269d2581a 100644 --- a/src-docs/metrics.runner.md +++ b/src-docs/metrics.runner.md @@ -21,7 +21,8 @@ Classes and function to extract the metrics from storage and issue runner metric ```python extract( metrics_storage_manager: StorageManager, - ignore_runners: set[str] + runners: set[str], + include: bool = False ) → Iterator[RunnerMetrics] ``` @@ -38,7 +39,8 @@ In order to avoid DoS attacks, the file size is also checked. **Args:** - `metrics_storage_manager`: The metrics storage manager. - - `ignore_runners`: The set of runners to ignore. + - `runners`: The runners to include or exclude. + - `include`: If true the provided runners are included for metric extraction, else the provided runners are excluded. @@ -48,7 +50,7 @@ In order to avoid DoS attacks, the file size is also checked. --- - + ## function `issue_events` diff --git a/src-docs/openstack_cloud.openstack_cloud.md b/src-docs/openstack_cloud.openstack_cloud.md new file mode 100644 index 000000000..d49b62008 --- /dev/null +++ b/src-docs/openstack_cloud.openstack_cloud.md @@ -0,0 +1,246 @@ + + + + +# module `openstack_cloud.openstack_cloud` +Class for accessing OpenStack API for managing servers. + + + +--- + + + +## class `OpenstackInstance` +Represents an OpenStack instance. + + + +**Attributes:** + + - `server_id`: ID of server assigned by OpenStack. + - `server_name`: Name of the server on OpenStack. + - `instance_id`: ID used by OpenstackCloud class to manage the instances. See docs on the OpenstackCloud. + - `addresses`: IP addresses assigned to the server. + - `status`: Status of the server. + + + +### method `__init__` + +```python +__init__(server: Server, prefix: str) +``` + +Construct the object. + + + +**Args:** + + - `server`: The OpenStack server. + - `prefix`: The name prefix for the servers. + + + +**Raises:** + + - `ValueError`: Provided server should not be managed under this prefix. + + + + + +--- + + + +## class `OpenstackCloud` +Client to interact with OpenStack cloud. + +The OpenStack server name is managed by this cloud. Caller refers to the instances via instance_id. If the caller needs the server name, e.g., for logging, it can be queried with get_server_name. + + + +### method `__init__` + +```python +__init__(clouds_config: dict[str, dict], cloud: str, prefix: str) +``` + +Create the object. + + + +**Args:** + + - `clouds_config`: The openstack clouds.yaml in dict format. + - `cloud`: The name of cloud to use in the clouds.yaml. + - `prefix`: Prefix attached to names of resource managed by this instance. Used for identifying which resource belongs to this instance. + + + + +--- + + + +### method `cleanup` + +```python +cleanup() → None +``` + +Cleanup unused key files and openstack keypairs. + +--- + + + +### method `delete_instance` + +```python +delete_instance(instance_id: str) → None +``` + +Delete a openstack instance. + + + +**Args:** + + - `instance_id`: The instance ID of the instance to delete. + +--- + + + +### method `get_instance` + +```python +get_instance(instance_id: str) → OpenstackInstance | None +``` + +Get OpenStack instance by instance ID. + + + +**Args:** + + - `instance_id`: The instance ID. + + + +**Returns:** + The OpenStack instance if found. + +--- + + + +### method `get_instances` + +```python +get_instances() → tuple[OpenstackInstance, ] +``` + +Get all OpenStack instances. + + + +**Returns:** + The OpenStack instances. + +--- + + + +### method `get_server_name` + +```python +get_server_name(instance_id: str) → str +``` + +Get server name on OpenStack. + + + +**Args:** + + - `instance_id`: ID used to identify a instance. + + + +**Returns:** + The OpenStack server name. + +--- + + + +### method `get_ssh_connection` + +```python +get_ssh_connection(instance: OpenstackInstance) → Connection +``` + +Get SSH connection to an OpenStack instance. + + + +**Args:** + + - `instance`: The OpenStack instance to connect to. + + + +**Raises:** + + - `SSHError`: Unable to get a working SSH connection to the instance. + - `KeyfileError`: Unable to find the keyfile to connect to the instance. + + + +**Returns:** + SSH connection object. + +--- + + + +### method `launch_instance` + +```python +launch_instance( + instance_id: str, + image: str, + flavor: str, + network: str, + cloud_init: str +) → OpenstackInstance +``` + +Create an OpenStack instance. + + + +**Args:** + + - `instance_id`: The instance ID to form the instance name. + - `image`: The image used to create the instance. + - `flavor`: The flavor used to create the instance. + - `network`: The network used to create the instance. + - `cloud_init`: The cloud init userdata to startup the instance. + + + +**Raises:** + + - `OpenStackError`: Unable to create OpenStack server. + + + +**Returns:** + The OpenStack instance created. + + diff --git a/src-docs/openstack_cloud.openstack_manager.md b/src-docs/openstack_cloud.openstack_manager.md index 697d3d96a..a0f0a2531 100644 --- a/src-docs/openstack_cloud.openstack_manager.md +++ b/src-docs/openstack_cloud.openstack_manager.md @@ -93,7 +93,7 @@ __init__( --- - + ## class `GithubRunnerRemoveError` Represents an error removing registered runner from Github. @@ -104,7 +104,7 @@ Represents an error removing registered runner from Github. --- - + ## class `OpenstackRunnerManager` Runner manager for OpenStack-based instances. @@ -117,7 +117,7 @@ Runner manager for OpenStack-based instances. - `unit_num`: The juju unit number. - `instance_name`: Prefix of the name for the set of runners. - + ### method `__init__` @@ -146,24 +146,32 @@ Construct OpenstackRunnerManager object. --- - + ### method `flush` ```python -flush() → int +flush(mode: FlushMode = ) → int ``` Flush Openstack servers. +1. Kill the processes depending on flush mode. 2. Get unhealthy runners after process purging. 3. Delete unhealthy runners. + + + +**Args:** + + - `mode`: The mode to determine which runner to flush. + **Returns:** - The number of runners flushed. + The number of runners flushed. --- - + ### method `get_github_runner_info` @@ -180,7 +188,7 @@ Get information on GitHub for the runners. --- - + ### method `reconcile` diff --git a/src-docs/openstack_cloud.openstack_runner_manager.md b/src-docs/openstack_cloud.openstack_runner_manager.md new file mode 100644 index 000000000..752e2f9d3 --- /dev/null +++ b/src-docs/openstack_cloud.openstack_runner_manager.md @@ -0,0 +1,279 @@ + + + + +# module `openstack_cloud.openstack_runner_manager` +Manager for self-hosted runner on OpenStack. + +**Global Variables** +--------------- +- **BUILD_OPENSTACK_IMAGE_SCRIPT_FILENAME** +- **MAX_METRICS_FILE_SIZE** +- **RUNNER_STARTUP_PROCESS** +- **RUNNER_LISTENER_PROCESS** +- **RUNNER_WORKER_PROCESS** +- **CREATE_SERVER_TIMEOUT** + + +--- + + + +## class `OpenStackCloudConfig` +Configuration for OpenStack cloud authorisation information. + + + +**Attributes:** + + - `clouds_config`: The clouds.yaml. + - `cloud`: The cloud name to connect to. + + + +### method `__init__` + +```python +__init__(clouds_config: dict[str, dict], cloud: str) → None +``` + + + + + + + + + +--- + + + +## class `OpenStackServerConfig` +Configuration for OpenStack server. + + + +**Attributes:** + + - `image`: The image name for runners to use. + - `flavor`: The flavor name for runners to use. + - `network`: The network name for runners to use. + + + +### method `__init__` + +```python +__init__(image: str, flavor: str, network: str) → None +``` + + + + + + + + + +--- + + + +## class `OpenstackRunnerManager` +Manage self-hosted runner on OpenStack cloud. + + + +**Attributes:** + + - `name_prefix`: The name prefix of the runners created. + + + +### method `__init__` + +```python +__init__( + prefix: str, + cloud_config: OpenStackCloudConfig, + server_config: OpenStackServerConfig, + runner_config: GitHubRunnerConfig, + service_config: SupportServiceConfig +) → None +``` + +Construct the object. + + + +**Args:** + + - `prefix`: The prefix to runner name. + - `cloud_config`: The configuration for OpenStack authorisation. + - `server_config`: The configuration for creating OpenStack server. + - `runner_config`: The configuration for the runner. + - `service_config`: The configuration of supporting services of the runners. + + +--- + +#### property name_prefix + +The prefix of runner names. + + + +**Returns:** + The prefix of the runner names managed by this class. + + + +--- + + + +### method `cleanup` + +```python +cleanup(remove_token: str) → Iterator[RunnerMetrics] +``` + +Cleanup runner and resource on the cloud. + + + +**Args:** + + - `remove_token`: The GitHub remove token. + + + +**Returns:** + Any metrics retrieved from cleanup runners. + +--- + + + +### method `create_runner` + +```python +create_runner(registration_token: str) → str +``` + +Create a self-hosted runner. + + + +**Args:** + + - `registration_token`: The GitHub registration token for registering runners. + + + +**Raises:** + + - `RunnerCreateError`: Unable to create runner due to OpenStack issues. + + + +**Returns:** + Instance ID of the runner. + +--- + + + +### method `delete_runner` + +```python +delete_runner(instance_id: str, remove_token: str) → RunnerMetrics | None +``` + +Delete self-hosted runners. + + + +**Args:** + + - `instance_id`: The instance id of the runner to delete. + - `remove_token`: The GitHub remove token. + + + +**Returns:** + Any metrics collected during the deletion of the runner. + +--- + + + +### method `flush_runners` + +```python +flush_runners(remove_token: str, busy: bool = False) → Iterator[RunnerMetrics] +``` + +Remove idle and/or busy runners. + + + +**Args:** + remove_token: + - `busy`: If false, only idle runners are removed. If true, both idle and busy runners are removed. + + + +**Returns:** + Any metrics retrieved from flushed runners. + +--- + + + +### method `get_runner` + +```python +get_runner(instance_id: str) → CloudRunnerInstance | None +``` + +Get a self-hosted runner by instance id. + + + +**Args:** + + - `instance_id`: The instance id. + + + +**Returns:** + Information on the runner instance. + +--- + + + +### method `get_runners` + +```python +get_runners( + states: Optional[Sequence[CloudRunnerState]] = None +) → tuple[CloudRunnerInstance, ] +``` + +Get self-hosted runners by state. + + + +**Args:** + + - `states`: Filter for the runners with these github states. If None all states will be included. + + + +**Returns:** + Information on the runner instances. + + diff --git a/src-docs/runner_manager_type.md b/src-docs/runner_manager_type.md index c3509b433..f6dd4faae 100644 --- a/src-docs/runner_manager_type.md +++ b/src-docs/runner_manager_type.md @@ -14,6 +14,8 @@ Types used by RunnerManager class. ## class `FlushMode` Strategy for flushing runners. +During pre-job (repo-check), the runners are marked as idle and if the pre-job fails, the runner falls back to being idle again. Hence wait_repo_check is required. + **Attributes:** @@ -30,7 +32,7 @@ Strategy for flushing runners. --- - + ## class `RunnerManagerClients` Clients for accessing various services. @@ -67,7 +69,7 @@ __init__( --- - + ## class `RunnerManagerConfig` Configuration of runner manager. @@ -119,7 +121,7 @@ Whether metrics for the runners should be collected. --- - + ## class `OpenstackRunnerManagerConfig` Configuration of runner manager. @@ -166,7 +168,7 @@ __init__( --- - + ## class `RunnerInfo` Information from GitHub of a runner. diff --git a/src-docs/runner_type.md b/src-docs/runner_type.md index 481a17a62..8c9db658a 100644 --- a/src-docs/runner_type.md +++ b/src-docs/runner_type.md @@ -11,7 +11,7 @@ Types used by Runner class. -## class `RunnerByHealth` +## class `RunnerNameByHealth` Set of runners instance by health state. diff --git a/src/errors.py b/src/errors.py index 55d84e8e1..59d28a239 100644 --- a/src/errors.py +++ b/src/errors.py @@ -166,3 +166,11 @@ class OpenStackInvalidConfigError(OpenStackError): class OpenStackUnauthorizedError(OpenStackError): """Represents an unauthorized connection to OpenStack.""" + + +class SSHError(Exception): + """Represents an error while interacting with SSH.""" + + +class KeyfileError(SSHError): + """Represents missing keyfile for SSH.""" diff --git a/src/manager/cloud_runner_manager.py b/src/manager/cloud_runner_manager.py new file mode 100644 index 000000000..28ed17b20 --- /dev/null +++ b/src/manager/cloud_runner_manager.py @@ -0,0 +1,204 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Interface of manager of runner instance on clouds.""" + +import abc +import logging +from dataclasses import dataclass +from enum import Enum, auto +from typing import Iterator, Sequence, Tuple + +from charm_state import GithubPath, ProxyConfig, SSHDebugConnection +from metrics.runner import RunnerMetrics + +logger = logging.getLogger(__name__) + +InstanceId = str + + +class HealthState(Enum): + """Health state of the runners. + + Attributes: + HEALTHY: The runner is healthy. + UNHEALTHY: The runner is not healthy. + UNKNOWN: Unable to get the health state. + """ + + HEALTHY = auto() + UNHEALTHY = auto() + UNKNOWN = auto() + + +class CloudRunnerState(str, Enum): + """Represent state of the instance hosting the runner. + + Attributes: + CREATED: The instance is created. + ACTIVE: The instance is active and running. + DELETED: The instance is deleted. + ERROR: The instance has encountered error and not running. + STOPPED: The instance has stopped. + UNKNOWN: The state of the instance is not known. + UNEXPECTED: An unknown state not accounted by the developer is encountered. + """ + + CREATED = auto() + ACTIVE = auto() + DELETED = auto() + ERROR = auto() + STOPPED = auto() + UNKNOWN = auto() + UNEXPECTED = auto() + + @staticmethod + def from_openstack_server_status( + openstack_server_status: str, + ) -> "CloudRunnerState": + """Create from openstack server status. + + The openstack server status are documented here: + https://docs.openstack.org/api-guide/compute/server_concepts.html + + Args: + openstack_server_status: Openstack server status. + + Returns: + The state of the runner. + """ + state = CloudRunnerState.UNEXPECTED + match openstack_server_status: + case "BUILD": + state = CloudRunnerState.CREATED + case "REBUILD": + state = CloudRunnerState.CREATED + case "ACTIVE": + state = CloudRunnerState.ACTIVE + case "ERROR": + state = CloudRunnerState.ERROR + case "STOPPED": + state = CloudRunnerState.STOPPED + case "DELETED": + state = CloudRunnerState.DELETED + case "UNKNOWN": + state = CloudRunnerState.UNKNOWN + case _: + state = CloudRunnerState.UNEXPECTED + return state + + +@dataclass +class GitHubRunnerConfig: + """Configuration for GitHub runner spawned. + + Attributes: + github_path: The GitHub organization or repository for runners to connect to. + labels: The labels to add to runners. + """ + + github_path: GithubPath + labels: list[str] + + +@dataclass +class SupportServiceConfig: + """Configuration for supporting services for runners. + + Attributes: + proxy_config: The proxy configuration. + dockerhub_mirror: The dockerhub mirror to use for runners. + ssh_debug_connections: The information on the ssh debug services. + repo_policy_url: The URL of the repo policy service. + repo_policy_token: The token to access the repo policy service. + """ + + proxy_config: ProxyConfig | None + dockerhub_mirror: str | None + ssh_debug_connections: list[SSHDebugConnection] | None + repo_policy_url: str | None + repo_policy_token: str | None + + +@dataclass +class CloudRunnerInstance: + """Information on the runner on the cloud. + + Attributes: + name: Name of the instance hosting the runner. + instance_id: ID of the instance. + health: Health state of the runner. + state: State of the instance hosting the runner. + """ + + name: str + instance_id: InstanceId + health: HealthState + state: CloudRunnerState + + +class CloudRunnerManager(abc.ABC): + """Manage runner instance on cloud. + + Attributes: + name_prefix: The name prefix of the self-hosted runners. + """ + + @property + @abc.abstractmethod + def name_prefix(self) -> str: + """Get the name prefix of the self-hosted runners.""" + + @abc.abstractmethod + def create_runner(self, registration_token: str) -> InstanceId: + """Create a self-hosted runner. + + Args: + registration_token: The GitHub registration token for registering runners. + """ + + @abc.abstractmethod + def get_runner(self, instance_id: InstanceId) -> CloudRunnerInstance: + """Get a self-hosted runner by instance id. + + Args: + instance_id: The instance id. + """ + + @abc.abstractmethod + def get_runners(self, states: Sequence[CloudRunnerState]) -> Tuple[CloudRunnerInstance]: + """Get self-hosted runners by state. + + Args: + states: Filter for the runners with these github states. If None all states will be + included. + """ + + @abc.abstractmethod + def delete_runner(self, instance_id: InstanceId, remove_token: str) -> RunnerMetrics | None: + """Delete self-hosted runner. + + Args: + instance_id: The instance id of the runner to delete. + remove_token: The GitHub remove token. + """ + + @abc.abstractmethod + def flush_runners(self, remove_token: str, busy: bool = False) -> Iterator[RunnerMetrics]: + """Stop all runners. + + Args: + remove_token: The GitHub remove token for removing runners. + busy: If false, only idle runners are removed. If true, both idle and busy runners are + removed. + """ + + @abc.abstractmethod + def cleanup(self, remove_token: str) -> Iterator[RunnerMetrics]: + """Cleanup runner and resource on the cloud. + + Perform health check on runner and delete the runner if it fails. + + Args: + remove_token: The GitHub remove token for removing runners. + """ diff --git a/src/manager/github_runner_manager.py b/src/manager/github_runner_manager.py new file mode 100644 index 000000000..0aed972bd --- /dev/null +++ b/src/manager/github_runner_manager.py @@ -0,0 +1,127 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Client for managing self-hosted runner on GitHub side.""" + +from enum import Enum, auto +from typing import Sequence + +from charm_state import GithubPath +from github_client import GithubClient +from github_type import GitHubRunnerStatus, SelfHostedRunner + + +class GitHubRunnerState(str, Enum): + """State of the self-hosted runner on GitHub. + + Attributes: + BUSY: Runner is working on a job assigned by GitHub. + IDLE: Runner is waiting to take a job or is running pre-job tasks (i.e. + repo-policy-compliance check). + OFFLINE: Runner is not connected to GitHub. + """ + + BUSY = auto() + IDLE = auto() + OFFLINE = auto() + + @staticmethod + def from_runner(runner: SelfHostedRunner) -> "GitHubRunnerState": + """Construct the object from GtiHub runner information. + + Args: + runner: Information on the GitHub self-hosted runner. + + Returns: + The state of runner. + """ + state = GitHubRunnerState.OFFLINE + # A runner that is busy and offline is possible. + if runner.busy: + state = GitHubRunnerState.BUSY + if runner.status == GitHubRunnerStatus.ONLINE: + if not runner.busy: + state = GitHubRunnerState.IDLE + return state + + +class GithubRunnerManager: + """Manage self-hosted runner on GitHub side.""" + + def __init__(self, prefix: str, token: str, path: GithubPath): + """Construct the object. + + Args: + prefix: The prefix in the name to identify the runners managed by this instance. + token: The GitHub personal access token to access the GitHub API. + path: The GitHub repository or organization to register the runners under. + """ + self._prefix = prefix + self._path = path + self.github = GithubClient(token) + + def get_runners( + self, states: Sequence[GitHubRunnerState] | None = None + ) -> tuple[SelfHostedRunner]: + """Get info on self-hosted runners of certain states. + + Args: + states: Filter the runners for these states. If None, all runners are returned. + + Returns: + Information on the runners. + """ + runner_list = self.github.get_runner_github_info(self._path) + return tuple( + runner + for runner in runner_list + if runner.name.startswith(self._prefix) + and GithubRunnerManager._is_runner_in_state(runner, states) + ) + + def delete_runners(self, states: Sequence[GitHubRunnerState] | None = None) -> None: + """Delete the self-hosted runners of certain states. + + Args: + states: Filter the runners for these states. If None, all runners are deleted. + """ + runner_list = self.get_runners(states) + for runner in runner_list: + self.github.delete_runner(self._path, runner.id) + + def get_registration_token(self) -> str: + """Get registration token from GitHub. + + This token is used for registering self-hosted runners. + + Returns: + The registration token. + """ + return self.github.get_runner_registration_token(self._path) + + def get_removal_token(self) -> str: + """Get removal token from GitHub. + + This token is used for removing self-hosted runners. + + Returns: + The removal token. + """ + return self.github.get_runner_remove_token(self._path) + + @staticmethod + def _is_runner_in_state( + runner: SelfHostedRunner, states: Sequence[GitHubRunnerState] | None + ) -> bool: + """Check that the runner is in one of the states provided. + + Args: + runner: Runner to filter. + states: States in which to check the runner belongs to. + + Returns: + True if the runner is in one of the state, else false. + """ + if states is None: + return True + return GitHubRunnerState.from_runner(runner) in states diff --git a/src/manager/runner_manager.py b/src/manager/runner_manager.py new file mode 100644 index 000000000..048b9c628 --- /dev/null +++ b/src/manager/runner_manager.py @@ -0,0 +1,337 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Class for managing the GitHub self-hosted runners hosted on cloud instances.""" + +import logging +from dataclasses import dataclass +from enum import Enum, auto +from multiprocessing import Pool +from typing import Iterator, Sequence, Type, cast + +from charm_state import GithubPath +from errors import GithubMetricsError, RunnerCreateError +from github_type import SelfHostedRunner +from manager.cloud_runner_manager import ( + CloudRunnerInstance, + CloudRunnerManager, + CloudRunnerState, + HealthState, + InstanceId, +) +from manager.github_runner_manager import GithubRunnerManager, GitHubRunnerState +from metrics import events as metric_events +from metrics import github as github_metrics +from metrics import runner as runner_metrics +from metrics.runner import RunnerMetrics + +logger = logging.getLogger(__name__) + +IssuedMetricEventsStats = dict[Type[metric_events.Event], int] + + +class FlushMode(Enum): + """Strategy for flushing runners. + + Attributes: + FLUSH_IDLE: Flush idle runners. + FLUSH_BUSY: Flush busy runners. + """ + + FLUSH_IDLE = auto() + FLUSH_BUSY = auto() + + +@dataclass +class RunnerInstance: + """Represents an instance of runner. + + Attributes: + name: Full name of the runner. Managed by the cloud runner manager. + instance_id: ID of the runner. Managed by the runner manager. + health: The health state of the runner. + github_state: State on github. + cloud_state: State on cloud. + """ + + name: str + instance_id: InstanceId + health: HealthState + github_state: GitHubRunnerState | None + cloud_state: CloudRunnerState + + def __init__(self, cloud_instance: CloudRunnerInstance, github_info: SelfHostedRunner | None): + """Construct an instance. + + Args: + cloud_instance: Information on the cloud instance. + github_info: Information on the GitHub of the runner. + """ + self.name = cloud_instance.name + self.instance_id = cloud_instance.instance_id + self.health = cloud_instance.health + self.github_state = ( + GitHubRunnerState.from_runner(github_info) if github_info is not None else None + ) + self.cloud_state = cloud_instance.state + + +@dataclass +class RunnerManagerConfig: + """Configuration for the runner manager. + + Attributes: + token: GitHub personal access token to query GitHub API. + path: Path to GitHub repository or organization to registry the runners. + """ + + token: str + path: GithubPath + + +class RunnerManager: + """Manage the runners. + + Attributes: + name_prefix: The name prefix of the runners. + """ + + def __init__(self, cloud_runner_manager: CloudRunnerManager, config: RunnerManagerConfig): + """Construct the object. + + Args: + cloud_runner_manager: For managing the cloud instance of the runner. + config: Configuration of this class. + """ + self._config = config + self._cloud = cloud_runner_manager + self.name_prefix = self._cloud.name_prefix + self._github = GithubRunnerManager( + prefix=self.name_prefix, token=self._config.token, path=self._config.path + ) + + def create_runners(self, num: int) -> tuple[InstanceId]: + """Create runners. + + Args: + num: Number of runners to create. + + Returns: + List of instance ID of the runners. + """ + logger.info("Creating %s runners", num) + registration_token = self._github.get_registration_token() + + create_runner_args = [ + RunnerManager._CreateRunnerArgs(self._cloud, registration_token) for _ in range(num) + ] + instance_id_list = [] + with Pool(processes=min(num, 10)) as pool: + jobs = pool.imap_unordered( + func=RunnerManager._create_runner, iterable=create_runner_args + ) + for _ in range(num): + try: + instance_id = next(jobs) + except RunnerCreateError: + logger.exception("Failed to spawn a runner.") + except StopIteration: + break + else: + instance_id_list.append(instance_id) + return tuple(instance_id_list) + + def get_runners( + self, + github_states: Sequence[GitHubRunnerState] | None = None, + cloud_states: Sequence[CloudRunnerState] | None = None, + ) -> tuple[RunnerInstance]: + """Get information on runner filter by state. + + Only runners that has cloud instance are returned. + + Args: + github_states: Filter for the runners with these github states. If None all + states will be included. + cloud_states: Filter for the runners with these cloud states. If None all states + will be included. + + Returns: + Information on the runners. + """ + logger.info("Getting runners...") + github_infos = self._github.get_runners(github_states) + cloud_infos = self._cloud.get_runners(cloud_states) + github_infos_map = {info.name: info for info in github_infos} + cloud_infos_map = {info.name: info for info in cloud_infos} + logger.info( + "Found following runners: %s", cloud_infos_map.keys() | github_infos_map.keys() + ) + + runner_names = cloud_infos_map.keys() & github_infos_map.keys() + cloud_only = cloud_infos_map.keys() - runner_names + github_only = github_infos_map.keys() - runner_names + if cloud_only: + logger.warning( + "Found runner instance on cloud but not registered on GitHub: %s", cloud_only + ) + if github_only: + logger.warning( + "Found self-hosted runner on GitHub but no matching runner instance on cloud: %s", + github_only, + ) + + runner_instances: list[RunnerInstance] = [ + RunnerInstance( + cloud_infos_map[name], github_infos_map[name] if name in github_infos_map else None + ) + for name in cloud_infos_map.keys() + ] + if cloud_states is not None: + runner_instances = [ + runner for runner in runner_instances if runner.cloud_state in cloud_states + ] + if github_states is not None: + runner_instances = [ + runner + for runner in runner_instances + if runner.github_state is not None and runner.github_state in github_states + ] + return cast(tuple[RunnerInstance], tuple(runner_instances)) + + def delete_runners(self, num: int) -> IssuedMetricEventsStats: + """Delete runners. + + Args: + num: The number of runner to delete. + + Returns: + Stats on metrics events issued during the deletion of runners. + """ + logger.info("Deleting %s number of runners", num) + runners_list = self.get_runners()[:num] + runner_names = [runner.name for runner in runners_list] + logger.info("Deleting runners: %s", runner_names) + remove_token = self._github.get_removal_token() + return self._delete_runners(runners=runners_list, remove_token=remove_token) + + def flush_runners( + self, flush_mode: FlushMode = FlushMode.FLUSH_IDLE + ) -> IssuedMetricEventsStats: + """Delete runners according to state. + + Args: + flush_mode: The type of runners affect by the deletion. + + Returns: + Stats on metrics events issued during the deletion of runners. + """ + match flush_mode: + case FlushMode.FLUSH_IDLE: + logger.info("Flushing idle runners...") + case FlushMode.FLUSH_BUSY: + logger.info("Flushing idle and busy runners...") + case _: + logger.critical( + "Unknown flush mode %s encountered, contact developers", flush_mode + ) + + busy = False + if flush_mode == FlushMode.FLUSH_BUSY: + busy = True + remove_token = self._github.get_removal_token() + stats = self._cloud.flush_runners(remove_token, busy) + return self._issue_runner_metrics(metrics=stats) + + def cleanup(self) -> IssuedMetricEventsStats: + """Run cleanup of the runners and other resources. + + Returns: + Stats on metrics events issued during the cleanup of runners. + """ + self._github.delete_runners([GitHubRunnerState.OFFLINE]) + remove_token = self._github.get_removal_token() + deleted_runner_metrics = self._cloud.cleanup(remove_token) + return self._issue_runner_metrics(metrics=deleted_runner_metrics) + + def _delete_runners( + self, runners: Sequence[RunnerInstance], remove_token: str + ) -> IssuedMetricEventsStats: + """Delete list of runners. + + Args: + runners: The runners to delete. + remove_token: The token for removing self-hosted runners. + + Returns: + Stats on metrics events issued during the deletion of runners. + """ + runner_metrics_list = [] + for runner in runners: + deleted_runner_metrics = self._cloud.delete_runner( + instance_id=runner.instance_id, remove_token=remove_token + ) + if deleted_runner_metrics is not None: + runner_metrics_list.append(deleted_runner_metrics) + return self._issue_runner_metrics(metrics=iter(runner_metrics_list)) + + def _issue_runner_metrics(self, metrics: Iterator[RunnerMetrics]) -> IssuedMetricEventsStats: + """Issue runner metrics. + + Args: + metrics: Runner metrics to issue. + + Returns: + Stats on runner metrics issued. + """ + total_stats: IssuedMetricEventsStats = {} + + for extracted_metrics in metrics: + try: + job_metrics = github_metrics.job( + github_client=self._github.github, + pre_job_metrics=extracted_metrics.pre_job, + runner_name=extracted_metrics.runner_name, + ) + except GithubMetricsError: + logger.exception( + "Failed to calculate job metrics for %s", extracted_metrics.runner_name + ) + job_metrics = None + + issued_events = runner_metrics.issue_events( + runner_metrics=extracted_metrics, + job_metrics=job_metrics, + flavor=self.name_prefix, + ) + + for event_type in issued_events: + total_stats[event_type] = total_stats.get(event_type, 0) + 1 + + return total_stats + + @dataclass + class _CreateRunnerArgs: + """Arguments for the _create_runner function. + + Attrs: + cloud_runner_manager: For managing the cloud instance of the runner. + registration_token: The GitHub provided-token for registering runners. + """ + + cloud_runner_manager: CloudRunnerManager + registration_token: str + + @staticmethod + def _create_runner(args: _CreateRunnerArgs) -> InstanceId: + """Create a single runner. + + This is a staticmethod for usage with multiprocess.Pool. + + Args: + args: The arguments. + + Returns: + The instance ID of the runner created. + """ + return args.cloud_runner_manager.create_runner(registration_token=args.registration_token) diff --git a/src/metrics/runner.py b/src/metrics/runner.py index dfdf11044..b0ccc191a 100644 --- a/src/metrics/runner.py +++ b/src/metrics/runner.py @@ -105,7 +105,7 @@ class RunnerMetrics(BaseModel): def extract( - metrics_storage_manager: MetricsStorageManager, ignore_runners: set[str] + metrics_storage_manager: MetricsStorageManager, runners: set[str], include: bool = False ) -> Iterator[RunnerMetrics]: """Extract metrics from runners. @@ -120,13 +120,17 @@ def extract( Args: metrics_storage_manager: The metrics storage manager. - ignore_runners: The set of runners to ignore. + runners: The runners to include or exclude. + include: If true the provided runners are included for metric extraction, else the provided + runners are excluded. Yields: Extracted runner metrics of a particular runner. """ for ms in metrics_storage_manager.list_all(): - if ms.runner_name not in ignore_runners: + if (include and ms.runner_name in runners) or ( + not include and ms.runner_name not in runners + ): runner_metrics = _extract_storage( metrics_storage_manager=metrics_storage_manager, metrics_storage=ms ) diff --git a/src/openstack_cloud/openstack_cloud.py b/src/openstack_cloud/openstack_cloud.py new file mode 100644 index 000000000..ad21f4d97 --- /dev/null +++ b/src/openstack_cloud/openstack_cloud.py @@ -0,0 +1,597 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Class for accessing OpenStack API for managing servers.""" + +import logging +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import datetime +from functools import reduce +from pathlib import Path +from typing import Iterable, Iterator, cast + +import openstack +import openstack.exceptions +import paramiko +import yaml +from fabric import Connection as SSHConnection +from openstack.compute.v2.keypair import Keypair as OpenstackKeypair +from openstack.compute.v2.server import Server as OpenstackServer +from openstack.connection import Connection as OpenstackConnection +from openstack.network.v2.security_group import SecurityGroup as OpenstackSecurityGroup +from paramiko.ssh_exception import NoValidConnectionsError + +from errors import KeyfileError, OpenStackError, SSHError +from utilities import retry + +logger = logging.getLogger(__name__) + +_CLOUDS_YAML_PATH = Path.home() / ".config/openstack/clouds.yaml" + +# Update the version when the security group rules are not backward compatible. +_SECURITY_GROUP_NAME = "github-runner-v1" + +_CREATE_SERVER_TIMEOUT = 5 * 60 +_SSH_TIMEOUT = 30 +_SSH_KEY_PATH = Path("/home/ubuntu/.ssh") +_TEST_STRING = "test_string" + + +@dataclass +class OpenstackInstance: + """Represents an OpenStack instance. + + Attributes: + server_id: ID of server assigned by OpenStack. + server_name: Name of the server on OpenStack. + instance_id: ID used by OpenstackCloud class to manage the instances. See docs on the + OpenstackCloud. + addresses: IP addresses assigned to the server. + status: Status of the server. + """ + + server_id: str + server_name: str + instance_id: str + addresses: list[str] + status: str + + def __init__(self, server: OpenstackServer, prefix: str): + """Construct the object. + + Args: + server: The OpenStack server. + prefix: The name prefix for the servers. + + Raises: + ValueError: Provided server should not be managed under this prefix. + """ + self.server_id = server.id + self.server_name = server.name + self.status = server.status + self.addresses = [ + address["addr"] + for network_addresses in server.addresses.values() + for address in network_addresses + ] + + if not self.server_name.startswith(f"{prefix}-"): + # Should never happen. + raise ValueError( + f"Found openstack server {server.name} managed under prefix {prefix}, contact devs" + ) + self.instance_id = self.server_name[len(prefix) + 1 :] + + +@contextmanager +@retry(tries=2, delay=5, local_logger=logger) +def _get_openstack_connection( + clouds_config: dict[str, dict], cloud: str +) -> Iterator[OpenstackConnection]: + """Create a connection context managed object, to be used within with statements. + + The file of _CLOUDS_YAML_PATH should only be modified by this function. + + Args: + clouds_config: The configuration in clouds.yaml format to apply. + cloud: The name of cloud to use in the clouds.yaml. + + Raises: + OpenStackError: if the credentials provided is not authorized. + + Yields: + An openstack.connection.Connection object. + """ + if not _CLOUDS_YAML_PATH.exists(): + _CLOUDS_YAML_PATH.parent.mkdir(parents=True, exist_ok=True) + + # Concurrency: Very small chance for the file to be corrupted due to multiple process calling + # this function and writing the file at the same time. This should cause the `conn.authorize` + # to fail, and retry of this function would resolve this. + _CLOUDS_YAML_PATH.write_text(data=yaml.dump(clouds_config), encoding="utf-8") + + # api documents that keystoneauth1.exceptions.MissingRequiredOptions can be raised but + # I could not reproduce it. Therefore, no catch here for such exception. + try: + with openstack.connect(cloud=cloud) as conn: + conn.authorize() + yield conn + # pylint thinks this isn't an exception, but does inherit from Exception class. + except openstack.exceptions.HttpException as exc: # pylint: disable=bad-exception-cause + logger.exception("OpenStack API call failure") + raise OpenStackError("Failed OpenStack API call") from exc + + +class OpenstackCloud: + """Client to interact with OpenStack cloud. + + The OpenStack server name is managed by this cloud. Caller refers to the instances via + instance_id. If the caller needs the server name, e.g., for logging, it can be queried with + get_server_name. + """ + + def __init__(self, clouds_config: dict[str, dict], cloud: str, prefix: str): + """Create the object. + + Args: + clouds_config: The openstack clouds.yaml in dict format. + cloud: The name of cloud to use in the clouds.yaml. + prefix: Prefix attached to names of resource managed by this instance. Used for + identifying which resource belongs to this instance. + """ + self._clouds_config = clouds_config + self._cloud = cloud + self.prefix = prefix + + # Ignore "Too many arguments" as 6 args should be fine. Move to a dataclass if new args are + # added. + def launch_instance( # pylint: disable=R0913 + self, instance_id: str, image: str, flavor: str, network: str, cloud_init: str + ) -> OpenstackInstance: + """Create an OpenStack instance. + + Args: + instance_id: The instance ID to form the instance name. + image: The image used to create the instance. + flavor: The flavor used to create the instance. + network: The network used to create the instance. + cloud_init: The cloud init userdata to startup the instance. + + Raises: + OpenStackError: Unable to create OpenStack server. + + Returns: + The OpenStack instance created. + """ + full_name = self.get_server_name(instance_id) + logger.info("Creating openstack server with %s", full_name) + + with _get_openstack_connection( + clouds_config=self._clouds_config, cloud=self._cloud + ) as conn: + security_group = OpenstackCloud._ensure_security_group(conn) + keypair = OpenstackCloud._setup_keypair(conn, full_name) + + try: + server = conn.create_server( + name=full_name, + image=image, + key_name=keypair.name, + flavor=flavor, + network=network, + security_groups=[security_group.id], + userdata=cloud_init, + auto_ip=False, + timeout=_CREATE_SERVER_TIMEOUT, + wait=True, + ) + except openstack.exceptions.ResourceTimeout as err: + logger.exception("Timeout creating openstack server %s", full_name) + logger.info( + "Attempting clean up of openstack server %s that timeout during creation", + full_name, + ) + self._delete_instance(conn, full_name) + raise OpenStackError(f"Timeout creating openstack server {full_name}") from err + except openstack.exceptions.SDKException as err: + logger.exception("Failed to create openstack server %s", full_name) + self._delete_keypair(conn, instance_id) + raise OpenStackError(f"Failed to create openstack server {full_name}") from err + + return OpenstackInstance(server, self.prefix) + + def get_instance(self, instance_id: str) -> OpenstackInstance | None: + """Get OpenStack instance by instance ID. + + Args: + instance_id: The instance ID. + + Returns: + The OpenStack instance if found. + """ + full_name = self.get_server_name(instance_id) + logger.info("Getting openstack server with %s", full_name) + + with _get_openstack_connection( + clouds_config=self._clouds_config, cloud=self._cloud + ) as conn: + server = OpenstackCloud._get_and_ensure_unique_server(conn, full_name) + if server is not None: + return OpenstackInstance(server, self.prefix) + return None + + def delete_instance(self, instance_id: str) -> None: + """Delete a openstack instance. + + Args: + instance_id: The instance ID of the instance to delete. + """ + full_name = self.get_server_name(instance_id) + logger.info("Deleting openstack server with %s", full_name) + + with _get_openstack_connection( + clouds_config=self._clouds_config, cloud=self._cloud + ) as conn: + self._delete_instance(conn, full_name) + + def _delete_instance(self, conn: OpenstackConnection, full_name: str) -> None: + """Delete a openstack instance. + + Raises: + OpenStackError: Unable to delete OpenStack server. + + Args: + conn: The openstack connection to use. + full_name: The full name of the server. + """ + try: + server = OpenstackCloud._get_and_ensure_unique_server(conn, full_name) + if server is not None: + conn.delete_server(name_or_id=server.id) + OpenstackCloud._delete_keypair(conn, full_name) + except ( + openstack.exceptions.SDKException, + openstack.exceptions.ResourceTimeout, + ) as err: + raise OpenStackError(f"Failed to remove openstack runner {full_name}") from err + + def get_ssh_connection(self, instance: OpenstackInstance) -> SSHConnection: + """Get SSH connection to an OpenStack instance. + + Args: + instance: The OpenStack instance to connect to. + + Raises: + SSHError: Unable to get a working SSH connection to the instance. + KeyfileError: Unable to find the keyfile to connect to the instance. + + Returns: + SSH connection object. + """ + key_path = OpenstackCloud._get_key_path(instance.server_name) + + if not key_path.exists(): + raise KeyfileError( + f"Missing keyfile for server: {instance.server_name}, key path: {key_path}" + ) + if not instance.addresses: + raise SSHError(f"No addresses found for OpenStack server {instance.server_name}") + + for ip in instance.addresses: + try: + connection = SSHConnection( + host=ip, + user="ubuntu", + connect_kwargs={"key_filename": str(key_path)}, + connect_timeout=_SSH_TIMEOUT, + ) + result = connection.run(f"echo {_TEST_STRING}", warn=True, timeout=_SSH_TIMEOUT) + if not result.ok: + logger.warning( + "SSH test connection failed, server: %s, address: %s", + instance.server_name, + ip, + ) + continue + if _TEST_STRING in result.stdout: + return connection + except (NoValidConnectionsError, TimeoutError, paramiko.ssh_exception.SSHException): + logger.warning( + "Unable to SSH into %s with address %s", + instance.server_name, + connection.host, + exc_info=True, + ) + continue + raise SSHError( + f"No connectable SSH addresses found, server: {instance.server_name}, " + f"addresses: {instance.addresses}" + ) + + def get_instances(self) -> tuple[OpenstackInstance, ...]: + """Get all OpenStack instances. + + Returns: + The OpenStack instances. + """ + logger.info("Getting all openstack servers managed by the charm") + + with _get_openstack_connection( + clouds_config=self._clouds_config, cloud=self._cloud + ) as conn: + instance_list = self._get_openstack_instances(conn) + server_names = set(server.name for server in instance_list) + + server_list = [ + OpenstackCloud._get_and_ensure_unique_server(conn, name) for name in server_names + ] + return tuple( + OpenstackInstance(server, self.prefix) + for server in server_list + if server is not None + ) + + def cleanup(self) -> None: + """Cleanup unused key files and openstack keypairs.""" + with _get_openstack_connection( + clouds_config=self._clouds_config, cloud=self._cloud + ) as conn: + instances = self._get_openstack_instances(conn) + exclude_list = [server.name for server in instances] + self._cleanup_key_files(exclude_list) + self._cleanup_openstack_keypairs(conn, exclude_list) + + def get_server_name(self, instance_id: str) -> str: + """Get server name on OpenStack. + + Args: + instance_id: ID used to identify a instance. + + Returns: + The OpenStack server name. + """ + return f"{self.prefix}-{instance_id}" + + def _cleanup_key_files(self, exclude_instances: Iterable[str]) -> None: + """Delete all SSH key files except the specified instances. + + Args: + exclude_instances: The keys of these instance will not be deleted. + """ + logger.info("Cleaning up SSH key files") + exclude_filename = set( + OpenstackCloud._get_key_path(instance) for instance in exclude_instances + ) + + total = 0 + deleted = 0 + for path in _SSH_KEY_PATH.iterdir(): + # Find key file from this application. + if path.is_file() and path.name.startswith(self.prefix) and path.name.endswith(".key"): + total += 1 + if path in exclude_filename: + continue + path.unlink() + deleted += 1 + logger.info("Found %s key files, clean up %s key files", total, deleted) + + def _cleanup_openstack_keypairs( + self, conn: OpenstackConnection, exclude_instances: Iterable[str] + ) -> None: + """Delete all OpenStack keypairs except the specified instances. + + Args: + conn: The Openstack connection instance. + exclude_instances: The keys of these instance will not be deleted. + """ + logger.info("Cleaning up openstack keypairs") + exclude_instance_set = set(exclude_instances) + keypairs = conn.list_keypairs() + for key in keypairs: + # The `name` attribute is of resource.Body type. + if key.name and str(key.name).startswith(self.prefix): + if str(key.name) in exclude_instance_set: + continue + try: + self._delete_keypair(conn, key.name) + except openstack.exceptions.SDKException: + logger.warning( + "Unable to delete OpenStack keypair associated with deleted key file %s ", + key.name, + ) + + def _get_openstack_instances(self, conn: OpenstackConnection) -> tuple[OpenstackServer, ...]: + """Get the OpenStack servers managed by this unit. + + Args: + conn: The connection object to access OpenStack cloud. + + Returns: + List of OpenStack instances. + """ + return tuple( + server + for server in cast(list[OpenstackServer], conn.list_servers()) + if server.name.startswith(f"{self.prefix}-") + ) + + @staticmethod + def _get_and_ensure_unique_server( + conn: OpenstackConnection, name: str + ) -> OpenstackServer | None: + """Get the latest server of the name and ensure it is unique. + + If multiple servers with the same name are found, the latest server in creation time is + returned. Other servers is deleted. + + Args: + conn: The connection to OpenStack. + name: The name of the OpenStack name. + + Returns: + A server with the name. + """ + servers: list[OpenstackServer] = conn.search_servers(name) + + if not servers: + return None + + # 2024/08/14: The `format` arg for `strptime` is the default format. + # This is only provided to get around a bug of the function with type checking. + latest_server = reduce( + lambda a, b: ( + a + if datetime.strptime(a.created_at, "a %b %d %H:%M:%S %Y") + < datetime.strptime(b.create_at, "a %b %d %H:%M:%S %Y") + else b + ), + servers, + ) + outdated_servers = filter(lambda x: x != latest_server, servers) + for server in outdated_servers: + try: + conn.delete_server(name_or_id=server.id) + except (openstack.exceptions.SDKException, openstack.exceptions.ResourceTimeout): + logger.warning( + "Unable to delete server with duplicate name %s with ID %s", + name, + server.id, + stack_info=True, + ) + + return latest_server + + @staticmethod + def _get_key_path(name: str) -> Path: + """Get the filepath for storing private SSH of a runner. + + Args: + name: The name of the runner. + + Returns: + Path to reserved for the key file of the runner. + """ + return _SSH_KEY_PATH / f"{name}.key" + + @staticmethod + def _setup_keypair(conn: OpenstackConnection, name: str) -> OpenstackKeypair: + """Create OpenStack keypair. + + Args: + conn: The connection object to access OpenStack cloud. + name: The name of the keypair. + + Returns: + The OpenStack keypair. + """ + key_path = OpenstackCloud._get_key_path(name) + + if key_path.exists(): + logger.warning("Existing private key file for %s found, removing it.", name) + key_path.unlink(missing_ok=True) + + keypair = conn.create_keypair(name=name) + key_path.parent.mkdir(parents=True, exist_ok=True) + key_path.write_text(keypair.private_key) + key_path.chmod(0o400) + return keypair + + @staticmethod + def _delete_keypair(conn: OpenstackConnection, name: str) -> None: + """Delete OpenStack keypair. + + Args: + conn: The connection object to access OpenStack cloud. + name: The name of the keypair. + """ + try: + # Keypair have unique names, access by ID is not needed. + if not conn.delete_keypair(name): + logger.warning("Unable to delete keypair for %s", name) + except (openstack.exceptions.SDKException, openstack.exceptions.ResourceTimeout): + logger.warning("Unable to delete keypair for %s", name, stack_info=True) + + key_path = OpenstackCloud._get_key_path(name) + key_path.unlink(missing_ok=True) + + @staticmethod + def _ensure_security_group(conn: OpenstackConnection) -> OpenstackSecurityGroup: + """Ensure runner security group exists. + + Args: + conn: The connection object to access OpenStack cloud. + + Returns: + The security group with the rules for runners. + """ + rule_exists_icmp = False + rule_exists_ssh = False + rule_exists_tmate_ssh = False + + security_group_list = conn.list_security_groups(filters={"name": _SECURITY_GROUP_NAME}) + # Pick the first security_group returned. + security_group = next(iter(security_group_list), None) + if security_group is None: + logger.info("Security group %s not found, creating it", _SECURITY_GROUP_NAME) + security_group = conn.create_security_group( + name=_SECURITY_GROUP_NAME, + description="For servers managed by the github-runner charm.", + ) + else: + existing_rules = security_group.security_group_rules + for rule in existing_rules: + if rule["protocol"] == "icmp": + logger.debug( + "Found ICMP rule in existing security group %s of ID %s", + _SECURITY_GROUP_NAME, + security_group.id, + ) + rule_exists_icmp = True + if ( + rule["protocol"] == "tcp" + and rule["port_range_min"] == rule["port_range_max"] == 22 + ): + logger.debug( + "Found SSH rule in existing security group %s of ID %s", + _SECURITY_GROUP_NAME, + security_group.id, + ) + rule_exists_ssh = True + if ( + rule["protocol"] == "tcp" + and rule["port_range_min"] == rule["port_range_max"] == 10022 + ): + logger.debug( + "Found tmate SSH rule in existing security group %s of ID %s", + _SECURITY_GROUP_NAME, + security_group.id, + ) + rule_exists_tmate_ssh = True + + if not rule_exists_icmp: + conn.create_security_group_rule( + secgroup_name_or_id=security_group.id, + protocol="icmp", + direction="ingress", + ethertype="IPv4", + ) + if not rule_exists_ssh: + conn.create_security_group_rule( + secgroup_name_or_id=security_group.id, + port_range_min="22", + port_range_max="22", + protocol="tcp", + direction="ingress", + ethertype="IPv4", + ) + if not rule_exists_tmate_ssh: + conn.create_security_group_rule( + secgroup_name_or_id=security_group.id, + port_range_min="10022", + port_range_max="10022", + protocol="tcp", + direction="egress", + ethertype="IPv4", + ) + return security_group diff --git a/src/openstack_cloud/openstack_manager.py b/src/openstack_cloud/openstack_manager.py index 75e75d65f..379d2ae4c 100644 --- a/src/openstack_cloud/openstack_manager.py +++ b/src/openstack_cloud/openstack_manager.py @@ -32,7 +32,7 @@ import openstack.exceptions import openstack.image.v2.image import paramiko -from fabric import Connection as SshConnection +from fabric import Connection as SSHConnection from openstack.compute.v2.server import Server from openstack.connection import Connection as OpenstackConnection from openstack.exceptions import SDKException @@ -62,7 +62,7 @@ from repo_policy_compliance_client import RepoPolicyComplianceClient from runner_manager import IssuedMetricEventsStats from runner_manager_type import FlushMode, OpenstackRunnerManagerConfig -from runner_type import GithubPath, RunnerByHealth, RunnerGithubInfo +from runner_type import GithubPath, RunnerGithubInfo, RunnerNameByHealth from utilities import retry, set_env_var logger = logging.getLogger(__name__) @@ -237,8 +237,6 @@ def _generate_runner_env( pre_job_script=str(PRE_JOB_SCRIPT), dockerhub_mirror=dockerhub_mirror or "", ssh_debug_info=(secrets.choice(ssh_debug_connections) if ssh_debug_connections else None), - # Proxies are handled by aproxy. - proxies={}, ) @@ -419,7 +417,7 @@ def get_github_runner_info(self) -> tuple[RunnerGithubInfo, ...]: if runner["name"].startswith(f"{self.instance_name}-") ) - def _get_openstack_runner_status(self, conn: OpenstackConnection) -> RunnerByHealth: + def _get_openstack_runner_status(self, conn: OpenstackConnection) -> RunnerNameByHealth: """Get status on OpenStack of each runner. Args: @@ -440,7 +438,7 @@ def _get_openstack_runner_status(self, conn: OpenstackConnection) -> RunnerByHea else: healthy_runner.append(instance.name) - return RunnerByHealth(healthy=tuple(healthy_runner), unhealthy=tuple(unhealthy_runner)) + return RunnerNameByHealth(healthy=tuple(healthy_runner), unhealthy=tuple(unhealthy_runner)) def _get_openstack_instances(self, conn: OpenstackConnection) -> list[Server]: """Get the OpenStack servers managed by this unit. @@ -556,7 +554,7 @@ def _ssh_health_check(conn: OpenstackConnection, server_name: str, startup: bool @retry(tries=3, delay=5, max_delay=60, backoff=2, local_logger=logger) def _get_ssh_connection( conn: OpenstackConnection, server_name: str, timeout: int = 30 - ) -> SshConnection: + ) -> SSHConnection: """Get a valid ssh connection within a network for a given openstack instance. The SSH connection will attempt to establish connection until the timeout configured. @@ -593,7 +591,7 @@ def _get_ssh_connection( ] for ip in server_addresses: try: - connection = SshConnection( + connection = SSHConnection( host=ip, user="ubuntu", connect_kwargs={"key_filename": str(key_path)}, @@ -1093,7 +1091,7 @@ def _pull_metrics(self, conn: OpenstackConnection, instance_name: str) -> None: return def _pull_file( - self, ssh_conn: SshConnection, remote_path: str, local_path: str, max_size: int + self, ssh_conn: SSHConnection, remote_path: str, local_path: str, max_size: int ) -> None: """Pull file from the runner instance. @@ -1302,7 +1300,7 @@ def _clean_up_openstack_keypairs( ) def _clean_up_runners( - self, conn: OpenstackConnection, runner_by_health: RunnerByHealth, remove_token: str + self, conn: OpenstackConnection, runner_by_health: RunnerNameByHealth, remove_token: str ) -> None: """Clean up offline or unhealthy runners. @@ -1355,7 +1353,7 @@ def _scale( self, quantity: int, conn: OpenstackConnection, - runner_by_health: RunnerByHealth, + runner_by_health: RunnerNameByHealth, remove_token: str, ) -> int: """Scale the number of runners. @@ -1462,7 +1460,7 @@ def _issue_runner_metrics(self, conn: OpenstackConnection) -> IssuedMetricEvents for extracted_metrics in runner_metrics.extract( metrics_storage_manager=metrics_storage, - ignore_runners=instance_names, + runners=instance_names, ): try: job_metrics = github_metrics.job( @@ -1488,7 +1486,7 @@ def _issue_reconciliation_metric( metric_stats: IssuedMetricEventsStats, reconciliation_start_ts: float, reconciliation_end_ts: float, - runner_states: RunnerByHealth, + runner_states: RunnerNameByHealth, ) -> None: """Issue reconciliation metric. @@ -1589,7 +1587,7 @@ def _kill_runner_processes(self, conn: OpenstackConnection, mode: FlushMode) -> servers = self._get_openstack_instances(conn=conn) for server in servers: - ssh_conn: SshConnection = self._get_ssh_connection(conn=conn, server_name=server.name) + ssh_conn: SSHConnection = self._get_ssh_connection(conn=conn, server_name=server.name) result: invoke.runners.Result = ssh_conn.run( killer_command, warn=True, diff --git a/src/openstack_cloud/openstack_runner_manager.py b/src/openstack_cloud/openstack_runner_manager.py new file mode 100644 index 000000000..6323b65fa --- /dev/null +++ b/src/openstack_cloud/openstack_runner_manager.py @@ -0,0 +1,806 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Manager for self-hosted runner on OpenStack.""" + +import logging +import secrets +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Iterator, Sequence + +import invoke +import jinja2 +import paramiko +import paramiko.ssh_exception +from fabric import Connection as SSHConnection + +from charm_state import GithubOrg +from errors import ( + CreateMetricsStorageError, + GetMetricsStorageError, + IssueMetricEventError, + KeyfileError, + OpenStackError, + RunnerCreateError, + RunnerStartError, + SSHError, +) +from manager.cloud_runner_manager import ( + CloudRunnerInstance, + CloudRunnerManager, + CloudRunnerState, + GitHubRunnerConfig, + InstanceId, + SupportServiceConfig, +) +from manager.runner_manager import HealthState +from metrics import events as metric_events +from metrics import runner as runner_metrics +from metrics import storage as metrics_storage +from openstack_cloud.openstack_cloud import OpenstackCloud, OpenstackInstance +from openstack_cloud.openstack_manager import GithubRunnerRemoveError +from repo_policy_compliance_client import RepoPolicyComplianceClient +from utilities import retry + +logger = logging.getLogger(__name__) + +BUILD_OPENSTACK_IMAGE_SCRIPT_FILENAME = "scripts/build-openstack-image.sh" +_CONFIG_SCRIPT_PATH = Path("/home/ubuntu/actions-runner/config.sh") + +RUNNER_APPLICATION = Path("/home/ubuntu/actions-runner") +METRICS_EXCHANGE_PATH = Path("/home/ubuntu/metrics-exchange") +PRE_JOB_SCRIPT = RUNNER_APPLICATION / "pre-job.sh" +MAX_METRICS_FILE_SIZE = 1024 + +RUNNER_STARTUP_PROCESS = "/home/ubuntu/actions-runner/run.sh" +RUNNER_LISTENER_PROCESS = "Runner.Listener" +RUNNER_WORKER_PROCESS = "Runner.Worker" +CREATE_SERVER_TIMEOUT = 5 * 60 + + +class _PullFileError(Exception): + """Represents an error while pulling a file from the runner instance.""" + + +@dataclass +class OpenStackCloudConfig: + """Configuration for OpenStack cloud authorisation information. + + Attributes: + clouds_config: The clouds.yaml. + cloud: The cloud name to connect to. + """ + + clouds_config: dict[str, dict] + cloud: str + + +@dataclass +class OpenStackServerConfig: + """Configuration for OpenStack server. + + Attributes: + image: The image name for runners to use. + flavor: The flavor name for runners to use. + network: The network name for runners to use. + """ + + image: str + flavor: str + network: str + + +@dataclass +class _RunnerHealth: + """Runners with health state. + + Attributes: + healthy: The list of healthy runners. + unhealthy: The list of unhealthy runners. + """ + + healthy: tuple[OpenstackInstance, ...] + unhealthy: tuple[OpenstackInstance, ...] + + +class OpenstackRunnerManager(CloudRunnerManager): + """Manage self-hosted runner on OpenStack cloud. + + Attributes: + name_prefix: The name prefix of the runners created. + """ + + # Ignore "Too many arguments", as the class requires a lot of configurations. + def __init__( # pylint: disable=R0913 + self, + prefix: str, + cloud_config: OpenStackCloudConfig, + server_config: OpenStackServerConfig, + runner_config: GitHubRunnerConfig, + service_config: SupportServiceConfig, + ) -> None: + """Construct the object. + + Args: + prefix: The prefix to runner name. + cloud_config: The configuration for OpenStack authorisation. + server_config: The configuration for creating OpenStack server. + runner_config: The configuration for the runner. + service_config: The configuration of supporting services of the runners. + """ + self._prefix = prefix + self._cloud_config = cloud_config + self._server_config = server_config + self._runner_config = runner_config + self._service_config = service_config + self._openstack_cloud = OpenstackCloud( + clouds_config=self._cloud_config.clouds_config, + cloud=self._cloud_config.cloud, + prefix=self.name_prefix, + ) + + @property + def name_prefix(self) -> str: + """The prefix of runner names. + + Returns: + The prefix of the runner names managed by this class. + """ + return self._prefix + + def create_runner(self, registration_token: str) -> InstanceId: + """Create a self-hosted runner. + + Args: + registration_token: The GitHub registration token for registering runners. + + Raises: + RunnerCreateError: Unable to create runner due to OpenStack issues. + + Returns: + Instance ID of the runner. + """ + start_timestamp = time.time() + instance_id = OpenstackRunnerManager._generate_instance_id() + instance_name = self._openstack_cloud.get_server_name(instance_id=instance_id) + cloud_init = self._generate_cloud_init( + instance_name=instance_name, registration_token=registration_token + ) + try: + instance = self._openstack_cloud.launch_instance( + instance_id=instance_id, + image=self._server_config.image, + flavor=self._server_config.flavor, + network=self._server_config.network, + cloud_init=cloud_init, + ) + except OpenStackError as err: + raise RunnerCreateError(f"Failed to create {instance_name} openstack runner") from err + + self._wait_runner_startup(instance) + self._wait_runner_running(instance) + + end_timestamp = time.time() + OpenstackRunnerManager._issue_runner_installed_metric( + name=instance_name, + flavor=self.name_prefix, + install_start_timestamp=start_timestamp, + install_end_timestamp=end_timestamp, + ) + return instance_id + + def get_runner(self, instance_id: InstanceId) -> CloudRunnerInstance | None: + """Get a self-hosted runner by instance id. + + Args: + instance_id: The instance id. + + Returns: + Information on the runner instance. + """ + instance = self._openstack_cloud.get_instance(instance_id) + healthy = self._runner_health_check(instance=instance) + return ( + CloudRunnerInstance( + name=instance.server_name, + instance_id=instance_id, + health=HealthState.HEALTHY if healthy else HealthState.UNHEALTHY, + state=CloudRunnerState.from_openstack_server_status(instance.status), + ) + if instance is not None + else None + ) + + def get_runners( + self, states: Sequence[CloudRunnerState] | None = None + ) -> tuple[CloudRunnerInstance, ...]: + """Get self-hosted runners by state. + + Args: + states: Filter for the runners with these github states. If None all states will be + included. + + Returns: + Information on the runner instances. + """ + instance_list = self._openstack_cloud.get_instances() + instance_list = [ + CloudRunnerInstance( + name=instance.server_name, + instance_id=instance.instance_id, + health=( + HealthState.HEALTHY + if self._runner_health_check(instance) + else HealthState.UNHEALTHY + ), + state=CloudRunnerState.from_openstack_server_status(instance.status), + ) + for instance in instance_list + ] + if states is None: + return tuple(instance_list) + return tuple(instance for instance in instance_list if instance.state in states) + + def delete_runner( + self, instance_id: InstanceId, remove_token: str + ) -> runner_metrics.RunnerMetrics | None: + """Delete self-hosted runners. + + Args: + instance_id: The instance id of the runner to delete. + remove_token: The GitHub remove token. + + Returns: + Any metrics collected during the deletion of the runner. + """ + instance = self._openstack_cloud.get_instance(instance_id) + if instance is None: + logger.warning( + "Unable to delete instance %s as it is not found", + self._openstack_cloud.get_server_name(instance_id), + ) + return None + + extracted_metrics = runner_metrics.extract( + metrics_storage_manager=metrics_storage, runners=set([instance.server_name]) + ) + self._delete_runner(instance, remove_token) + return next(extracted_metrics, None) + + def flush_runners( + self, remove_token: str, busy: bool = False + ) -> Iterator[runner_metrics.RunnerMetrics]: + """Remove idle and/or busy runners. + + Args: + remove_token: + busy: If false, only idle runners are removed. If true, both idle and busy runners are + removed. + + Returns: + Any metrics retrieved from flushed runners. + """ + instance_list = self._openstack_cloud.get_instances() + for instance in instance_list: + try: + self._check_state_and_flush(instance, busy) + except SSHError: + logger.warning( + "Unable to determine state of %s and kill runner process due to SSH issues", + instance.server_name, + ) + continue + return self.cleanup(remove_token) + + def cleanup(self, remove_token: str) -> Iterator[runner_metrics.RunnerMetrics]: + """Cleanup runner and resource on the cloud. + + Args: + remove_token: The GitHub remove token. + + Returns: + Any metrics retrieved from cleanup runners. + """ + runners = self._get_runners_health() + healthy_runner_names = [runner.server_name for runner in runners.healthy] + metrics = runner_metrics.extract( + metrics_storage_manager=metrics_storage, runners=set(healthy_runner_names) + ) + for runner in runners.unhealthy: + self._delete_runner(runner, remove_token) + + self._openstack_cloud.cleanup() + return metrics + + def _delete_runner(self, instance: OpenstackInstance, remove_token: str) -> None: + """Delete self-hosted runners by openstack instance. + + Args: + instance: The OpenStack instance. + remove_token: The GitHub remove token. + """ + try: + ssh_conn = self._openstack_cloud.get_ssh_connection(instance) + self._pull_runner_metrics(instance.server_name, ssh_conn) + + try: + OpenstackRunnerManager._run_runner_removal_script( + instance.server_name, ssh_conn, remove_token + ) + except GithubRunnerRemoveError: + logger.warning( + "Unable to run github runner removal script for %s", + instance.server_name, + stack_info=True, + ) + except SSHError: + logger.exception( + "Failed to get SSH connection while removing %s", instance.server_name + ) + logger.warning( + "Skipping runner remove script for %s due to SSH issues", instance.server_name + ) + + try: + self._openstack_cloud.delete_instance(instance.instance_id) + except OpenStackError: + logger.exception( + "Unable to delete openstack instance for runner %s", instance.server_name + ) + + def _get_runners_health(self) -> _RunnerHealth: + """Get runners by health state. + + Returns: + Runners by health state. + """ + runner_list = self._openstack_cloud.get_instances() + + healthy, unhealthy = [], [] + for runner in runner_list: + if self._runner_health_check(runner): + healthy.append(runner) + else: + unhealthy.append(runner) + return _RunnerHealth(healthy=tuple(healthy), unhealthy=tuple(unhealthy)) + + def _runner_health_check(self, instance: OpenstackInstance) -> bool: + """Run health check on a runner. + + Args: + instance: The instance hosting the runner to run health check on. + + Returns: + True if runner is healthy. + """ + cloud_state = CloudRunnerState.from_openstack_server_status(instance.status) + return cloud_state not in set( + ( + CloudRunnerState.DELETED, + CloudRunnerState.ERROR, + CloudRunnerState.STOPPED, + ) + ) and self._health_check(instance) + + def _generate_cloud_init(self, instance_name: str, registration_token: str) -> str: + """Generate cloud init userdata. + + This is the script the openstack server runs on startup. + + Args: + instance_name: The name of the instance. + registration_token: The GitHub runner registration token. + + Returns: + The cloud init userdata for openstack instance. + """ + jinja = jinja2.Environment(loader=jinja2.FileSystemLoader("templates"), autoescape=True) + + env_contents = jinja.get_template("env.j2").render( + pre_job_script=str(PRE_JOB_SCRIPT), + dockerhub_mirror=self._service_config.dockerhub_mirror or "", + ssh_debug_info=( + secrets.choice(self._service_config.ssh_debug_connections) + if self._service_config.ssh_debug_connections + else None + ), + ) + + pre_job_contents_dict = { + "issue_metrics": True, + "metrics_exchange_path": str(METRICS_EXCHANGE_PATH), + "do_repo_policy_check": False, + } + repo_policy = self._get_repo_policy_compliance_client() + if repo_policy is not None: + pre_job_contents_dict.update( + { + "repo_policy_base_url": repo_policy.base_url, + "repo_policy_one_time_token": repo_policy.get_one_time_token(), + "do_repo_policy_check": True, + } + ) + + pre_job_contents = jinja.get_template("pre-job.j2").render(pre_job_contents_dict) + + runner_group = None + if isinstance(self._runner_config.github_path, GithubOrg): + runner_group = self._runner_config.github_path.group + aproxy_address = ( + self._service_config.proxy_config.aproxy_address + if self._service_config.proxy_config is not None + else None + ) + return jinja.get_template("openstack-userdata.sh.j2").render( + github_url=f"https://github.com/{self._runner_config.github_path.path()}", + runner_group=runner_group, + token=registration_token, + instance_labels=",".join(self._runner_config.labels), + instance_name=instance_name, + env_contents=env_contents, + pre_job_contents=pre_job_contents, + metrics_exchange_path=str(METRICS_EXCHANGE_PATH), + aproxy_address=aproxy_address, + dockerhub_mirror=self._service_config.dockerhub_mirror, + ) + + def _get_repo_policy_compliance_client(self) -> RepoPolicyComplianceClient | None: + """Get repo policy compliance client. + + Returns: + The repo policy compliance client. + """ + if self._service_config.repo_policy_url and self._service_config.repo_policy_token: + return RepoPolicyComplianceClient( + self._service_config.repo_policy_url, self._service_config.repo_policy_token + ) + return None + + @retry(tries=3, delay=5, backoff=2, local_logger=logger) + def _check_state_and_flush(self, instance: OpenstackInstance, busy: bool) -> None: + """Kill runner process depending on idle or busy. + + Due to update to runner state has some delay with GitHub API. The state of the runner is + determined by which runner processes are running. If the Runner.Worker process is running, + the runner is deemed to be busy. + + Raises: + SSHError: Unable to check the state of the runner and kill the runner process due to + SSH failure. + + Args: + instance: The openstack instance to kill the runner process. + busy: Kill the process if runner is busy, else only kill runner + process if runner is idle. + """ + try: + ssh_conn = self._openstack_cloud.get_ssh_connection(instance) + except KeyfileError: + logger.exception( + "Health check failed due to unable to find keyfile for %s", instance.server_name + ) + return + except SSHError: + logger.exception( + "SSH connection failure with %s during flushing", instance.server_name + ) + raise + + # Using a single command to determine the state and kill the process if needed. + # This makes it more robust when network is unstable. + if busy: + logger.info("Attempting to kill all runner process on %s", instance.server_name) + # kill both Runner.Listener and Runner.Worker processes. + # This kills pre-job.sh, a child process of Runner.Worker. + kill_command = ( + f"pgrep -x {RUNNER_LISTENER_PROCESS} && kill $(pgrep -x {RUNNER_LISTENER_PROCESS});" + f"pgrep -x {RUNNER_WORKER_PROCESS} && kill $(pgrep -x {RUNNER_WORKER_PROCESS});" + ) + else: + logger.info( + "Attempting to kill runner process on %s if not busy", instance.server_name + ) + # Only kill Runner.Listener if Runner.Worker does not exist. + kill_command = ( + f"pgrep -x {RUNNER_WORKER_PROCESS} || pgrep -x {RUNNER_LISTENER_PROCESS} && " + f"kill $(pgrep -x {RUNNER_LISTENER_PROCESS})" + ) + # Checking the result of kill command is not useful, as the exit code does not reveal much. + ssh_conn.run(kill_command, warn=True) + + @retry(tries=3, delay=5, backoff=2, local_logger=logger) + def _health_check(self, instance: OpenstackInstance) -> bool: + """Check whether runner is healthy. + + Args: + instance: The OpenStack instance to conduit the health check. + + Raises: + SSHError: Unable to get a SSH connection to the instance. + + Returns: + Whether the runner is healthy. + """ + try: + ssh_conn = self._openstack_cloud.get_ssh_connection(instance) + except KeyfileError: + logger.exception( + "Health check failed due to unable to find keyfile for %s", instance.server_name + ) + return False + except SSHError: + logger.exception( + "SSH connection failure with %s during health check", instance.server_name + ) + raise + return OpenstackRunnerManager._run_health_check(ssh_conn, instance.server_name) + + @staticmethod + def _run_health_check(ssh_conn: SSHConnection, name: str) -> bool: + """Run a health check for runner process. + + Args: + ssh_conn: The SSH connection to the runner. + name: The name of the runner. + + Returns: + Whether the health succeed. + """ + result: invoke.runners.Result = ssh_conn.run("ps aux", warn=True) + if not result.ok: + logger.warning("SSH run of `ps aux` failed on %s: %s", name, result.stderr) + return False + if ( + RUNNER_WORKER_PROCESS not in result.stdout + and RUNNER_LISTENER_PROCESS not in result.stdout + ): + logger.warning("Runner process not found on %s", name) + return False + return True + + @retry(tries=10, delay=60, local_logger=logger) + def _wait_runner_startup(self, instance: OpenstackInstance) -> None: + """Wait until runner is startup. + + Args: + instance: The runner instance. + + Raises: + RunnerStartError: The runner startup process was not found on the runner. + """ + try: + ssh_conn = self._openstack_cloud.get_ssh_connection(instance) + except SSHError as err: + raise RunnerStartError( + f"Failed to SSH to {instance.server_name} during creation possible due to setup " + "not completed" + ) from err + + result: invoke.runners.Result = ssh_conn.run("ps aux", warn=True) + if not result.ok: + logger.warning("SSH run of `ps aux` failed on %s", instance.server_name) + raise RunnerStartError(f"Unable to SSH run `ps aux` on {instance.server_name}") + if RUNNER_STARTUP_PROCESS not in result.stdout: + logger.warning("Runner startup process not found on %s", instance.server_name) + raise RunnerStartError(f"Runner startup process not found on {instance.server_name}") + logger.info("Runner startup process found to be healthy on %s", instance.server_name) + + @retry(tries=5, delay=60, local_logger=logger) + def _wait_runner_running(self, instance: OpenstackInstance) -> None: + """Wait until runner is running. + + Args: + instance: The runner instance. + + Raises: + RunnerStartError: The runner process was not found on the runner. + """ + try: + ssh_conn = self._openstack_cloud.get_ssh_connection(instance) + except SSHError as err: + raise RunnerStartError( + f"Failed to SSH connect to {instance.server_name} openstack runner" + ) from err + + if not self._run_health_check(ssh_conn=ssh_conn, name=instance.server_name): + logger.info("Runner process not found on %s", instance.server_name) + raise RunnerStartError( + f"Runner process on {instance.server_name} failed to initialize on after starting" + ) + + logger.info("Runner process found to be healthy on %s", instance.server_name) + + @staticmethod + def _generate_instance_id() -> InstanceId: + """Generate a instance id. + + Return: + The id. + """ + return secrets.token_hex(12) + + @staticmethod + def _issue_runner_installed_metric( + name: str, + flavor: str, + install_start_timestamp: float, + install_end_timestamp: float, + ) -> None: + """Issue metric for runner installed event. + + Args: + name: The name of the runner. + flavor: The flavor of the runner. + install_start_timestamp: The timestamp of installation start. + install_end_timestamp: The timestamp of installation end. + """ + try: + metric_events.issue_event( + event=metric_events.RunnerInstalled( + timestamp=install_start_timestamp, + flavor=flavor, + duration=install_end_timestamp - install_start_timestamp, + ) + ) + except IssueMetricEventError: + logger.exception("Failed to issue RunnerInstalled metric") + + try: + storage = metrics_storage.create(name) + except CreateMetricsStorageError: + logger.exception( + "Failed to create metrics storage for runner %s, " + "will not be able to issue all metrics.", + name, + ) + else: + try: + (storage.path / runner_metrics.RUNNER_INSTALLED_TS_FILE_NAME).write_text( + str(install_end_timestamp), encoding="utf-8" + ) + except FileNotFoundError: + logger.exception( + "Failed to write runner-installed.timestamp into metrics storage " + "for runner %s, will not be able to issue all metrics.", + name, + ) + + @staticmethod + def _pull_runner_metrics(name: str, ssh_conn: SSHConnection) -> None: + """Pull metrics from runner. + + Args: + name: The name of the runner. + ssh_conn: The SSH connection to the runner. + """ + try: + storage = metrics_storage.get(name) + except GetMetricsStorageError: + logger.exception( + "Failed to get shared metrics storage for runner %s, " + "will not be able to issue all metrics.", + name, + ) + return + + try: + OpenstackRunnerManager._ssh_pull_file( + ssh_conn=ssh_conn, + remote_path=str(METRICS_EXCHANGE_PATH / "pre-job-metrics.json"), + local_path=str(storage.path / "pre-job-metrics.json"), + max_size=MAX_METRICS_FILE_SIZE, + ) + OpenstackRunnerManager._ssh_pull_file( + ssh_conn=ssh_conn, + remote_path=str(METRICS_EXCHANGE_PATH / "post-job-metrics.json"), + local_path=str(storage.path / "post-job-metrics.json"), + max_size=MAX_METRICS_FILE_SIZE, + ) + except _PullFileError as exc: + logger.warning( + "Failed to pull metrics for %s: %s . Will not be able to issue all metrics", + name, + exc, + ) + + @staticmethod + def _ssh_pull_file( + ssh_conn: SSHConnection, remote_path: str, local_path: str, max_size: int + ) -> None: + """Pull file from the runner instance. + + Args: + ssh_conn: The SSH connection instance. + remote_path: The file path on the runner instance. + local_path: The local path to store the file. + max_size: If the file is larger than this, it will not be pulled. + + Raises: + _PullFileError: Unable to pull the file from the runner instance. + SSHError: Issue with SSH connection. + """ + try: + result = ssh_conn.run(f"stat -c %s {remote_path}", warn=True) + except ( + TimeoutError, + paramiko.ssh_exception.NoValidConnectionsError, + paramiko.ssh_exception.SSHException, + ) as exc: + raise SSHError(f"Unable to SSH into {ssh_conn.host}") from exc + if not result.ok: + logger.warning( + ( + "Unable to get file size of %s on instance %s, " + "exit code: %s, stdout: %s, stderr: %s" + ), + remote_path, + ssh_conn.host, + result.return_code, + result.stdout, + result.stderr, + ) + raise _PullFileError(f"Unable to get file size of {remote_path}") + + stdout = result.stdout + try: + stdout.strip() + size = int(stdout) + if size > max_size: + raise _PullFileError(f"File size of {remote_path} too large {size} > {max_size}") + except ValueError as exc: + raise _PullFileError(f"Invalid file size for {remote_path}: stdout") from exc + + try: + ssh_conn.get(remote=remote_path, local=local_path) + except ( + TimeoutError, + paramiko.ssh_exception.NoValidConnectionsError, + paramiko.ssh_exception.SSHException, + ) as exc: + raise SSHError(f"Unable to SSH into {ssh_conn.host}") from exc + except OSError as exc: + raise _PullFileError(f"Unable to retrieve file {remote_path}") from exc + + @staticmethod + def _run_runner_removal_script( + instance_name: str, ssh_conn: SSHConnection, remove_token: str + ) -> None: + """Run Github runner removal script. + + Args: + instance_name: The name of the runner instance. + ssh_conn: The SSH connection to the runner instance. + remove_token: The GitHub instance removal token. + + Raises: + GithubRunnerRemoveError: Unable to remove runner from GitHub. + """ + try: + result = ssh_conn.run( + f"{_CONFIG_SCRIPT_PATH} remove --token {remove_token}", + warn=True, + ) + if result.ok: + return + + logger.warning( + ( + "Unable to run removal script on instance %s, " + "exit code: %s, stdout: %s, stderr: %s" + ), + instance_name, + result.return_code, + result.stdout, + result.stderr, + ) + raise GithubRunnerRemoveError(f"Failed to remove runner {instance_name} from Github.") + except ( + TimeoutError, + paramiko.ssh_exception.NoValidConnectionsError, + paramiko.ssh_exception.SSHException, + ) as exc: + raise GithubRunnerRemoveError( + f"Failed to remove runner {instance_name} from Github." + ) from exc diff --git a/src/runner_manager.py b/src/runner_manager.py index 09487f453..8d68a68c9 100644 --- a/src/runner_manager.py +++ b/src/runner_manager.py @@ -43,7 +43,7 @@ from runner import LXD_PROFILE_YAML, CreateRunnerConfig, Runner, RunnerConfig, RunnerStatus from runner_manager_type import FlushMode, RunnerInfo, RunnerManagerClients, RunnerManagerConfig from runner_type import ProxySetting as RunnerProxySetting -from runner_type import RunnerByHealth +from runner_type import RunnerNameByHealth from utilities import execute_command, retry, set_env_var REMOVED_RUNNER_LOG_STR = "Removed runner: %s" @@ -222,7 +222,7 @@ def get_github_info(self) -> Iterator[RunnerInfo]: for runner in remote_runners.values() ) - def _get_runner_health_states(self) -> RunnerByHealth: + def _get_runner_health_states(self) -> RunnerNameByHealth: """Get all runners sorted into health groups. Returns: @@ -247,7 +247,7 @@ def _get_runner_health_states(self) -> RunnerByHealth: else: unhealthy.append(runner.name) - return RunnerByHealth(healthy, unhealthy) + return RunnerNameByHealth(healthy, unhealthy) def _create_runner( self, registration_token: str, resources: VirtualMachineResources, runner: Runner @@ -325,7 +325,7 @@ def _issue_runner_metrics(self) -> IssuedMetricEventsStats: total_stats: IssuedMetricEventsStats = {} for extracted_metrics in runner_metrics.extract( - metrics_storage_manager=shared_fs, ignore_runners=set(runner_states.healthy) + metrics_storage_manager=shared_fs, runners=set(runner_states.healthy) ): try: job_metrics = github_metrics.job( @@ -491,7 +491,7 @@ def _remove_runners(self, count: int, runners: list[Runner]) -> None: logger.info("There are no idle runners to remove.") def _cleanup_offline_runners( - self, runner_states: RunnerByHealth, all_runners: list[Runner] + self, runner_states: RunnerNameByHealth, all_runners: list[Runner] ) -> None: """Cleanup runners that are not running the github run.sh script. diff --git a/src/runner_type.py b/src/runner_type.py index ef4ce5f07..86769eafd 100644 --- a/src/runner_type.py +++ b/src/runner_type.py @@ -12,7 +12,7 @@ @dataclass -class RunnerByHealth: +class RunnerNameByHealth: """Set of runners instance by health state. Attributes: diff --git a/templates/env.j2 b/templates/env.j2 index c0de54aad..f7da33219 100644 --- a/templates/env.j2 +++ b/templates/env.j2 @@ -1,18 +1,4 @@ PATH=/home/ubuntu/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin -{% if proxies.http %} -HTTP_PROXY={{proxies.http}} -http_proxy={{proxies.http}} -{% endif %} -{% if proxies.https %} -HTTPS_PROXY={{proxies.https}} -https_proxy={{proxies.https}} -{% endif %} -{% if proxies.ftp_proxy %} -{% endif %} -{% if proxies.no_proxy %} -NO_PROXY={{proxies.no_proxy}} -no_proxy={{proxies.no_proxy}} -{% endif %} {% if dockerhub_mirror %} DOCKERHUB_MIRROR={{dockerhub_mirror}} CONTAINER_REGISTRY_URL={{dockerhub_mirror}} diff --git a/templates/openstack-userdata.sh.j2 b/templates/openstack-userdata.sh.j2 index 3f0ff7be6..047a62be1 100644 --- a/templates/openstack-userdata.sh.j2 +++ b/templates/openstack-userdata.sh.j2 @@ -2,6 +2,8 @@ set -e +hostnamectl set-hostname github-runner + # Write .env contents su - ubuntu -c 'cd ~/actions-runner && echo "{{ env_contents }}" > .env' diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index d493b9485..fbfa2dedc 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -93,7 +93,7 @@ def test_id_fixture() -> str: def app_name_fixture(existing_app: Optional[str], test_id: str) -> str: """Randomized application name.""" # Randomized app name to avoid collision when runner is connecting to GitHub. - return existing_app or f"integration-id{test_id}" + return existing_app or f"test-{secrets.token_hex(4)}" @pytest.fixture(scope="module", name="charm_file") @@ -311,6 +311,22 @@ def openstack_config_fixture( ) +@pytest.fixture(scope="module", name="openstack_test_image") +def openstack_test_image_fixture(pytestconfig: pytest.Config) -> str: + """Image for testing openstack interfaces.""" + test_image = pytestconfig.getoption("--openstack-test-image") + assert test_image, "Please specify the --openstack-test-image command line option" + return test_image + + +@pytest.fixture(scope="module", name="openstack_test_flavor") +def openstack_test_flavor_fixture(pytestconfig: pytest.Config) -> str: + """Flavor for testing openstack interfaces.""" + test_flavor = pytestconfig.getoption("--openstack-test-flavor") + assert test_flavor, "Please specify the --openstack-test-flavor command line option" + return test_flavor + + @pytest.fixture(scope="module", name="openstack_connection") def openstack_connection_fixture( clouds_yaml_contents: str, app_name: str diff --git a/tests/integration/helpers/common.py b/tests/integration/helpers/common.py index 658403506..f0a107bc2 100644 --- a/tests/integration/helpers/common.py +++ b/tests/integration/helpers/common.py @@ -375,7 +375,7 @@ def _is_workflow_run_complete(run: WorkflowRun) -> bool: async def dispatch_workflow( - app: Application, + app: Application | None, branch: Branch, github_repository: Repository, conclusion: str, @@ -400,14 +400,16 @@ async def dispatch_workflow( Returns: The workflow run. """ + if dispatch_input is None: + assert app is not None, "If dispatch input not given the app cannot be None." + dispatch_input = {"runner": app.name} + start_time = datetime.now(timezone.utc) workflow = github_repository.get_workflow(id_or_file_name=workflow_id_or_name) # The `create_dispatch` returns True on success. - assert workflow.create_dispatch( - branch, dispatch_input or {"runner": app.name} - ), "Failed to create workflow" + assert workflow.create_dispatch(branch, dispatch_input), "Failed to create workflow" # There is a very small chance of selecting a run not created by the dispatch above. run: WorkflowRun | None = await wait_for( diff --git a/tests/integration/test_e2e.py b/tests/integration/test_e2e.py index b3fb311ed..bed193216 100644 --- a/tests/integration/test_e2e.py +++ b/tests/integration/test_e2e.py @@ -44,7 +44,7 @@ async def test_e2e_workflow( """ arrange: An app connected to an OpenStack cloud with no runners. act: Run e2e test workflow. - assert: + assert: No exception thrown. """ virt_type: str if instance_type == InstanceType.OPENSTACK: diff --git a/tests/integration/test_runner_manager_openstack.py b/tests/integration/test_runner_manager_openstack.py new file mode 100644 index 000000000..b20426ca0 --- /dev/null +++ b/tests/integration/test_runner_manager_openstack.py @@ -0,0 +1,427 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Testing the RunnerManager class with OpenStackRunnerManager as CloudManager.""" + + +import json +from pathlib import Path +from secrets import token_hex +from typing import Iterator + +import pytest +import pytest_asyncio +import yaml +from github.Branch import Branch +from github.Repository import Repository +from github.Workflow import Workflow +from openstack.connection import Connection as OpenstackConnection + +from charm_state import GithubPath, ProxyConfig, parse_github_path +from manager.cloud_runner_manager import CloudRunnerState, GitHubRunnerConfig, SupportServiceConfig +from manager.github_runner_manager import GitHubRunnerState +from manager.runner_manager import FlushMode, RunnerManager, RunnerManagerConfig +from metrics import events, storage +from openstack_cloud.openstack_cloud import _CLOUDS_YAML_PATH +from openstack_cloud.openstack_runner_manager import ( + OpenStackCloudConfig, + OpenstackRunnerManager, + OpenStackServerConfig, +) +from tests.integration.helpers.common import ( + DISPATCH_WAIT_TEST_WORKFLOW_FILENAME, + dispatch_workflow, + wait_for, +) + + +@pytest.fixture(scope="module", name="runner_label") +def runner_label(): + return f"test-{token_hex(6)}" + + +@pytest.fixture(scope="module", name="log_dir_base_path") +def log_dir_base_path_fixture( + tmp_path_factory: pytest.TempPathFactory, +) -> Iterator[dict[str, Path]]: + """Mock the log directory path and return it.""" + with pytest.MonkeyPatch.context() as monkeypatch: + temp_log_dir = tmp_path_factory.mktemp("log") + + filesystem_base_path = temp_log_dir / "runner-fs" + filesystem_quarantine_path = temp_log_dir / "runner-fs-quarantine" + metric_log_path = temp_log_dir / "metric_log" + + monkeypatch.setattr(storage, "FILESYSTEM_BASE_PATH", filesystem_base_path) + monkeypatch.setattr(storage, "FILESYSTEM_QUARANTINE_PATH", filesystem_quarantine_path) + monkeypatch.setattr(events, "METRICS_LOG_PATH", metric_log_path) + + yield { + "filesystem_base_path": filesystem_base_path, + "filesystem_quarantine_path": filesystem_quarantine_path, + "metric_log": metric_log_path, + } + + +@pytest.fixture(scope="module", name="github_path") +def github_path_fixture(path: str) -> GithubPath: + return parse_github_path(path, "Default") + + +@pytest.fixture(scope="module", name="proxy_config") +def openstack_proxy_config_fixture( + openstack_http_proxy: str, openstack_https_proxy: str, openstack_no_proxy: str +) -> ProxyConfig: + use_aproxy = False + if openstack_http_proxy or openstack_https_proxy: + use_aproxy = True + http_proxy = openstack_http_proxy if openstack_http_proxy else None + https_proxy = openstack_https_proxy if openstack_https_proxy else None + return ProxyConfig( + http=http_proxy, + https=https_proxy, + no_proxy=openstack_no_proxy, + use_aproxy=use_aproxy, + ) + + +@pytest_asyncio.fixture(scope="module", name="openstack_runner_manager") +async def openstack_runner_manager_fixture( + app_name: str, + private_endpoint_clouds_yaml: str, + openstack_test_image: str, + flavor_name: str, + network_name: str, + github_path: GithubPath, + proxy_config: ProxyConfig, + runner_label: str, + openstack_connection: OpenstackConnection, +) -> OpenstackRunnerManager: + """Create OpenstackRunnerManager instance. + + The prefix args of OpenstackRunnerManager set to app_name to let openstack_connection_fixture + perform the cleanup of openstack resources. + """ + _CLOUDS_YAML_PATH.unlink(missing_ok=True) + clouds_config = yaml.safe_load(private_endpoint_clouds_yaml) + + cloud_config = OpenStackCloudConfig( + clouds_config=clouds_config, + cloud="testcloud", + ) + server_config = OpenStackServerConfig( + image=openstack_test_image, + flavor=flavor_name, + network=network_name, + ) + runner_config = GitHubRunnerConfig( + github_path=github_path, + labels=["openstack_test", runner_label], + ) + service_config = SupportServiceConfig( + proxy_config=proxy_config, + dockerhub_mirror=None, + ssh_debug_connections=None, + repo_policy_url=None, + repo_policy_token=None, + ) + return OpenstackRunnerManager( + app_name, cloud_config, server_config, runner_config, service_config + ) + + +@pytest_asyncio.fixture(scope="module", name="runner_manager") +async def runner_manager_fixture( + openstack_runner_manager: OpenstackRunnerManager, + token: str, + github_path: GithubPath, + log_dir_base_path: dict[str, Path], +) -> RunnerManager: + """Get RunnerManager instance. + + Import of log_dir_base_path to monkeypatch the runner logs path with tmp_path. + """ + config = RunnerManagerConfig(token, github_path) + return RunnerManager(openstack_runner_manager, config) + + +@pytest_asyncio.fixture(scope="function", name="runner_manager_with_one_runner") +async def runner_manager_with_one_runner_fixture(runner_manager: RunnerManager) -> RunnerManager: + runner_manager.create_runners(1) + runner_list = runner_manager.get_runners() + try: + await wait_runner_amount(runner_manager, 1) + except TimeoutError as err: + raise AssertionError("Test arrange failed: Expect one runner") from err + + runner = runner_list[0] + assert ( + runner.cloud_state == CloudRunnerState.ACTIVE + ), "Test arrange failed: Expect runner in active state" + try: + await wait_for( + lambda: runner_manager.get_runners()[0].github_state == GitHubRunnerState.IDLE, + timeout=120, + check_interval=10, + ) + except TimeoutError as err: + raise AssertionError("Test arrange failed: Expect runner in idle state") from err + return runner_manager + + +def workflow_is_status(workflow: Workflow, status: str) -> bool: + """Check if workflow in provided status. + + Args: + workflow: The workflow to check. + status: The status to check for. + + Returns: + Whether the workflow is in the status. + """ + workflow.update() + return workflow.status == status + + +async def wait_runner_amount(runner_manager: RunnerManager, num: int): + """Wait until the runner manager has the number of runners. + + A TimeoutError will be thrown if runners amount is not correct after timeout. + + Args: + runner_manager: The RunnerManager to check. + num: Number of runner to check for. + """ + runner_list = runner_manager.get_runners() + assert isinstance(runner_list, tuple) + if len(runner_list) == num: + return + + # The openstack server can take sometime to fully clean up or create. + await wait_for(lambda: len(runner_manager.get_runners()) == num) + + +@pytest.mark.openstack +@pytest.mark.asyncio +@pytest.mark.abort_on_fail +async def test_get_no_runner(runner_manager: RunnerManager) -> None: + """ + Arrange: RunnerManager instance with no runners. + Act: Get runners. + Assert: Empty tuple returned. + """ + runner_list = runner_manager.get_runners() + assert isinstance(runner_list, tuple) + assert not runner_list + + +@pytest.mark.openstack +@pytest.mark.asyncio +@pytest.mark.abort_on_fail +async def test_runner_normal_idle_lifecycle( + runner_manager: RunnerManager, openstack_runner_manager: OpenstackRunnerManager +) -> None: + """ + Arrange: RunnerManager instance with no runners. + Act: + 1. Create one runner. + 2. Run health check on the runner. + 3. Run cleanup. + 4. Delete all idle runner. + Assert: + 1. An active idle runner. + 2. Health check passes. + 3. One idle runner remains. + 4. No runners. + """ + # 1. + runner_id_list = runner_manager.create_runners(1) + assert isinstance(runner_id_list, tuple) + assert len(runner_id_list) == 1 + runner_id = runner_id_list[0] + + try: + await wait_runner_amount(runner_manager, 1) + except TimeoutError as err: + raise AssertionError("Test arrange failed: Expect one runner") from err + + runner_list = runner_manager.get_runners() + assert isinstance(runner_list, tuple) + assert len(runner_list) == 1 + runner = runner_list[0] + assert runner.instance_id == runner_id + assert runner.cloud_state == CloudRunnerState.ACTIVE + # Update on GitHub-side can take a bit of time. + await wait_for( + lambda: runner_manager.get_runners()[0].github_state == GitHubRunnerState.IDLE, + timeout=120, + check_interval=10, + ) + + # 2. + openstack_instances = openstack_runner_manager._openstack_cloud.get_instances() + assert len(openstack_instances) == 1, "Test arrange failed: Needs one runner." + runner = openstack_instances[0] + + assert openstack_runner_manager._health_check(runner) + + # 3. + runner_manager.cleanup() + runner_list = runner_manager.get_runners() + assert isinstance(runner_list, tuple) + assert len(runner_list) == 1 + runner = runner_list[0] + assert runner.instance_id == runner_id + assert runner.cloud_state == CloudRunnerState.ACTIVE + + # 4. + runner_manager.flush_runners(flush_mode=FlushMode.FLUSH_IDLE) + await wait_runner_amount(runner_manager, 0) + + +@pytest.mark.openstack +@pytest.mark.asyncio +@pytest.mark.abort_on_fail +async def test_runner_flush_busy_lifecycle( + runner_manager_with_one_runner: RunnerManager, + test_github_branch: Branch, + github_repository: Repository, + runner_label: str, +): + """ + Arrange: RunnerManager with one idle runner. + Act: + 1. Run a long workflow. + 3. Run flush idle runner. + 4. Run flush busy runner. + Assert: + 1. Runner takes the job and become busy. + 3. Busy runner still exists. + 4. No runners exists. + """ + # 1. + workflow = await dispatch_workflow( + app=None, + branch=test_github_branch, + github_repository=github_repository, + conclusion="success", + workflow_id_or_name=DISPATCH_WAIT_TEST_WORKFLOW_FILENAME, + dispatch_input={"runner": runner_label, "minutes": "30"}, + wait=False, + ) + await wait_for(lambda: workflow_is_status(workflow, "in_progress")) + + runner_list = runner_manager_with_one_runner.get_runners() + assert len(runner_list) == 1 + busy_runner = runner_list[0] + assert busy_runner.cloud_state == CloudRunnerState.ACTIVE + assert busy_runner.github_state == GitHubRunnerState.BUSY + + # 2. + runner_manager_with_one_runner.cleanup() + runner_list = runner_manager_with_one_runner.get_runners() + assert isinstance(runner_list, tuple) + assert len(runner_list) == 1 + runner = runner_list[0] + assert runner.cloud_state == CloudRunnerState.ACTIVE + assert busy_runner.github_state == GitHubRunnerState.BUSY + + # 3. + runner_manager_with_one_runner.flush_runners(flush_mode=FlushMode.FLUSH_IDLE) + runner_list = runner_manager_with_one_runner.get_runners() + assert len(runner_list) == 1 + busy_runner = runner_list[0] + assert busy_runner.cloud_state == CloudRunnerState.ACTIVE + assert busy_runner.github_state == GitHubRunnerState.BUSY + + # 4. + runner_manager_with_one_runner.flush_runners(flush_mode=FlushMode.FLUSH_BUSY) + await wait_runner_amount(runner_manager_with_one_runner, 0) + + +@pytest.mark.openstack +@pytest.mark.asyncio +@pytest.mark.abort_on_fail +async def test_runner_normal_lifecycle( + runner_manager_with_one_runner: RunnerManager, + test_github_branch: Branch, + github_repository: Repository, + runner_label: str, + log_dir_base_path: dict[str, Path], +): + """ + Arrange: RunnerManager with one runner. Clean metric logs. + Act: + 1. Start a test workflow for the runner. + 2. Run cleanup. + Assert: + 1. The workflow complete successfully. + 2. The runner should be deleted. The metrics should be recorded. + """ + metric_log_path = log_dir_base_path["metric_log"] + metric_log_existing_content = metric_log_path.read_text(encoding="utf-8") + + workflow = await dispatch_workflow( + app=None, + branch=test_github_branch, + github_repository=github_repository, + conclusion="success", + workflow_id_or_name=DISPATCH_WAIT_TEST_WORKFLOW_FILENAME, + dispatch_input={"runner": runner_label, "minutes": "0"}, + wait=False, + ) + await wait_for(lambda: workflow_is_status(workflow, "completed")) + + issue_metrics_events = runner_manager_with_one_runner.cleanup() + assert issue_metrics_events[events.RunnerStart] == 1 + assert issue_metrics_events[events.RunnerStop] == 1 + + metric_log_full_content = metric_log_path.read_text(encoding="utf-8") + assert metric_log_full_content.startswith( + metric_log_existing_content + ), "The metric log was modified in ways other than appending" + metric_log_new_content = metric_log_full_content[len(metric_log_existing_content) :] + metric_logs = [json.loads(metric) for metric in metric_log_new_content.splitlines()] + assert ( + len(metric_logs) == 2 + ), "Assuming two events should be runner_start and runner_stop, modify this if new events are added" + assert metric_logs[0]["event"] == "runner_start" + assert metric_logs[0]["workflow"] == "Workflow Dispatch Wait Tests" + assert metric_logs[1]["event"] == "runner_stop" + assert metric_logs[1]["workflow"] == "Workflow Dispatch Wait Tests" + + await wait_runner_amount(runner_manager_with_one_runner, 0) + + +@pytest.mark.openstack +@pytest.mark.asyncio +@pytest.mark.abort_on_fail +async def test_runner_spawn_two( + runner_manager: RunnerManager, openstack_runner_manager: OpenstackRunnerManager +) -> None: + """ + Arrange: RunnerManager instance with no runners. + Act: + 1. Create two runner. + 2. Delete all idle runner. + Assert: + 1. Two active idle runner. + 2. No runners. + """ + # 1. + runner_id_list = runner_manager.create_runners(2) + assert isinstance(runner_id_list, tuple) + assert len(runner_id_list) == 2 + + try: + await wait_runner_amount(runner_manager, 2) + except TimeoutError as err: + raise AssertionError("Test arrange failed: Expect two runner") from err + + runner_list = runner_manager.get_runners() + assert isinstance(runner_list, tuple) + assert len(runner_list) == 2 + + # 3. + runner_manager.flush_runners(flush_mode=FlushMode.FLUSH_IDLE) + await wait_runner_amount(runner_manager, 0) diff --git a/tests/unit/metrics/test_runner.py b/tests/unit/metrics/test_runner.py index 02b5ad028..bf0a14251 100644 --- a/tests/unit/metrics/test_runner.py +++ b/tests/unit/metrics/test_runner.py @@ -170,9 +170,7 @@ def test_extract(runner_fs_base: Path): metrics_storage_manager.list_all.return_value = [runner1_fs, runner2_fs, runner3_fs] extracted_metrics = list( - runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=set() - ) + runner_metrics.extract(metrics_storage_manager=metrics_storage_manager, runners=set()) ) assert extracted_metrics == [ @@ -218,7 +216,7 @@ def test_extract_ignores_runners(runner_fs_base: Path): extracted_metrics = list( runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=ignore_runners + metrics_storage_manager=metrics_storage_manager, runners=ignore_runners ) ) @@ -253,9 +251,7 @@ def test_extract_corrupt_data(runner_fs_base: Path, monkeypatch: pytest.MonkeyPa monkeypatch.setattr(runner_metrics, "move_to_quarantine", move_to_quarantine_mock) extracted_metrics = list( - runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=set() - ) + runner_metrics.extract(metrics_storage_manager=metrics_storage_manager, runners=set()) ) assert not extracted_metrics @@ -275,9 +271,7 @@ def test_extract_corrupt_data(runner_fs_base: Path, monkeypatch: pytest.MonkeyPa metrics_storage_manager.list_all.return_value = [runner_fs] extracted_metrics = list( - runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=set() - ) + runner_metrics.extract(metrics_storage_manager=metrics_storage_manager, runners=set()) ) assert not extracted_metrics move_to_quarantine_mock.assert_any_call(metrics_storage_manager, runner_fs.runner_name) @@ -296,9 +290,7 @@ def test_extract_corrupt_data(runner_fs_base: Path, monkeypatch: pytest.MonkeyPa metrics_storage_manager.list_all.return_value = [runner_fs] extracted_metrics = list( - runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=set() - ) + runner_metrics.extract(metrics_storage_manager=metrics_storage_manager, runners=set()) ) assert not extracted_metrics move_to_quarantine_mock.assert_any_call(metrics_storage_manager, runner_fs.runner_name) @@ -317,9 +309,7 @@ def test_extract_corrupt_data(runner_fs_base: Path, monkeypatch: pytest.MonkeyPa metrics_storage_manager.list_all.return_value = [runner_fs] extracted_metrics = list( - runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=set() - ) + runner_metrics.extract(metrics_storage_manager=metrics_storage_manager, runners=set()) ) assert not extracted_metrics @@ -357,9 +347,7 @@ def test_extract_raises_error_for_too_large_files( monkeypatch.setattr(runner_metrics, "move_to_quarantine", move_to_quarantine_mock) extracted_metrics = list( - runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=set() - ) + runner_metrics.extract(metrics_storage_manager=metrics_storage_manager, runners=set()) ) assert not extracted_metrics @@ -381,9 +369,7 @@ def test_extract_raises_error_for_too_large_files( metrics_storage_manager.list_all.return_value = [runner_fs] extracted_metrics = list( - runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=set() - ) + runner_metrics.extract(metrics_storage_manager=metrics_storage_manager, runners=set()) ) assert not extracted_metrics @@ -406,9 +392,7 @@ def test_extract_raises_error_for_too_large_files( metrics_storage_manager.list_all.return_value = [runner_fs] extracted_metrics = list( - runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=set() - ) + runner_metrics.extract(metrics_storage_manager=metrics_storage_manager, runners=set()) ) assert not extracted_metrics @@ -446,9 +430,7 @@ def test_extract_ignores_filesystems_without_ts(runner_fs_base: Path): metrics_storage_manager.list_all.return_value = [runner_fs] extracted_metrics = list( - runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=set() - ) + runner_metrics.extract(metrics_storage_manager=metrics_storage_manager, runners=set()) ) assert not extracted_metrics metrics_storage_manager.delete.assert_called_once_with(runner_fs.runner_name) @@ -481,7 +463,7 @@ def test_extract_ignores_failure_on_shared_fs_cleanup( ) extracted_metrics = runner_metrics.extract( - metrics_storage_manager=metrics_storage_manager, ignore_runners=set() + metrics_storage_manager=metrics_storage_manager, runners=set() ) assert list(extracted_metrics) == [runner_metrics_data] diff --git a/tests/unit/test_openstack_manager.py b/tests/unit/test_openstack_manager.py index 445a0b8d3..5329b1282 100644 --- a/tests/unit/test_openstack_manager.py +++ b/tests/unit/test_openstack_manager.py @@ -10,7 +10,7 @@ import openstack.connection import openstack.exceptions import pytest -from fabric.connection import Connection as SshConnection +from fabric.connection import Connection as SSHConnection from invoke import Result from openstack.compute.v2.keypair import Keypair from openstack.compute.v2.server import Server @@ -27,7 +27,7 @@ from openstack_cloud import openstack_manager from openstack_cloud.openstack_manager import MAX_METRICS_FILE_SIZE, METRICS_EXCHANGE_PATH from runner_manager_type import FlushMode -from runner_type import RunnerByHealth, RunnerGithubInfo +from runner_type import RunnerGithubInfo, RunnerNameByHealth from tests.unit import factories FAKE_MONGODB_URI = "mongodb://example.com/db" @@ -57,7 +57,7 @@ def patch_get_ssh_connection_health_check_fixture(monkeypatch: pytest.MonkeyPatc mock_get_ssh_connection = MagicMock( spec=openstack_manager.OpenstackRunnerManager._get_ssh_connection ) - mock_ssh_connection = MagicMock(spec=SshConnection) + mock_ssh_connection = MagicMock(spec=SSHConnection) mock_ssh_connection.host = "test host IP" mock_result = MagicMock(spec=Result) mock_result.ok = True @@ -79,7 +79,7 @@ def ssh_connection_health_check_fixture(monkeypatch: pytest.MonkeyPatch): mock_get_ssh_connection = MagicMock( spec=openstack_manager.OpenstackRunnerManager._get_ssh_connection ) - mock_ssh_connection = MagicMock(spec=SshConnection) + mock_ssh_connection = MagicMock(spec=SSHConnection) mock_ssh_connection.host = "test host IP" mock_result = MagicMock(spec=Result) mock_result.ok = True @@ -97,7 +97,7 @@ def patch_ssh_connection_error_fixture(monkeypatch: pytest.MonkeyPatch): mock_get_ssh_connection = MagicMock( spec=openstack_manager.OpenstackRunnerManager._get_ssh_connection ) - mock_ssh_connection = MagicMock(spec=SshConnection) + mock_ssh_connection = MagicMock(spec=SSHConnection) mock_result = MagicMock(spec=Result) mock_result.ok = False mock_result.stdout = "Mock stdout" @@ -153,7 +153,7 @@ def patched_create_connection_context_fixture(monkeypatch: pytest.MonkeyPatch): def ssh_connection_mock_fixture() -> MagicMock: """Return a mocked ssh connection.""" test_file_content = secrets.token_hex(16) - ssh_conn_mock = MagicMock(spec=openstack_manager.SshConnection) + ssh_conn_mock = MagicMock(spec=openstack_manager.SSHConnection) ssh_conn_mock.get.side_effect = lambda remote, local: Path(local).write_text(test_file_content) ssh_conn_mock.run.side_effect = lambda cmd, **kwargs: ( Result(stdout="1") if cmd.startswith("stat") else Result() @@ -287,52 +287,23 @@ def test__create_connection( @pytest.mark.parametrize( - "proxy_config, dockerhub_mirror, ssh_debug_connections, expected_env_contents", + "dockerhub_mirror, ssh_debug_connections, expected_env_contents", [ pytest.param( - None, None, None, """PATH=/home/ubuntu/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin - - - - LANG=C.UTF-8 ACTIONS_RUNNER_HOOK_JOB_STARTED=/home/ubuntu/actions-runner/pre-job.sh """, id="all values empty", ), pytest.param( - openstack_manager.ProxyConfig( - http="http://test.internal", - https="https://test.internal", - no_proxy="http://no_proxy.internal", - ), - None, - None, - """PATH=/home/ubuntu/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin - - - - - -LANG=C.UTF-8 -ACTIONS_RUNNER_HOOK_JOB_STARTED=/home/ubuntu/actions-runner/pre-job.sh -""", - id="proxy value set", - ), - pytest.param( - None, "http://dockerhub_mirror.test", None, """PATH=/home/ubuntu/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin - - - - DOCKERHUB_MIRROR=http://dockerhub_mirror.test CONTAINER_REGISTRY_URL=http://dockerhub_mirror.test @@ -342,7 +313,6 @@ def test__create_connection( id="dockerhub mirror set", ), pytest.param( - None, None, [ openstack_manager.SSHDebugConnection( @@ -354,10 +324,6 @@ def test__create_connection( ], """PATH=/home/ubuntu/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin - - - - LANG=C.UTF-8 ACTIONS_RUNNER_HOOK_JOB_STARTED=/home/ubuntu/actions-runner/pre-job.sh @@ -369,11 +335,6 @@ def test__create_connection( id="ssh debug connection set", ), pytest.param( - openstack_manager.ProxyConfig( - http="http://test.internal", - https="https://test.internal", - no_proxy="http://no_proxy.internal", - ), "http://dockerhub_mirror.test", [ openstack_manager.SSHDebugConnection( @@ -385,10 +346,6 @@ def test__create_connection( ], """PATH=/home/ubuntu/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin - - - - DOCKERHUB_MIRROR=http://dockerhub_mirror.test CONTAINER_REGISTRY_URL=http://dockerhub_mirror.test @@ -405,7 +362,6 @@ def test__create_connection( ], ) def test__generate_runner_env( - proxy_config: Optional[openstack_manager.ProxyConfig], dockerhub_mirror: Optional[str], ssh_debug_connections: Optional[list[openstack_manager.SSHDebugConnection]], expected_env_contents: str, @@ -510,7 +466,7 @@ def test_reconcile_pulls_metric_files( monkeypatch.setattr(openstack_manager.metrics_storage, "create", MagicMock(return_value=ms)) monkeypatch.setattr(openstack_manager.metrics_storage, "get", MagicMock(return_value=ms)) openstack_manager_for_reconcile._get_openstack_runner_status = MagicMock( - return_value=RunnerByHealth(healthy=(), unhealthy=("test_runner",)) + return_value=RunnerNameByHealth(healthy=(), unhealthy=("test_runner",)) ) ssh_connection_mock.get.side_effect = MagicMock() openstack_manager_for_reconcile.reconcile(quantity=0) @@ -545,7 +501,7 @@ def test_reconcile_does_not_pull_too_large_files( Result(stdout=f"{MAX_METRICS_FILE_SIZE + 1}") if cmd.startswith("stat") else Result() ) openstack_manager_for_reconcile._get_openstack_runner_status = MagicMock( - return_value=RunnerByHealth(healthy=("test_runner",), unhealthy=()) + return_value=RunnerNameByHealth(healthy=("test_runner",), unhealthy=()) ) openstack_manager_for_reconcile.reconcile(quantity=0) @@ -570,7 +526,7 @@ def test_reconcile_issue_reconciliation_metrics( monkeypatch.setattr(openstack_manager.metrics_storage, "create", MagicMock(return_value=ms)) monkeypatch.setattr(openstack_manager.metrics_storage, "get", MagicMock(return_value=ms)) openstack_manager_for_reconcile._get_openstack_runner_status = MagicMock( - return_value=RunnerByHealth(healthy=("test_runner",), unhealthy=()) + return_value=RunnerNameByHealth(healthy=("test_runner",), unhealthy=()) ) openstack_manager.runner_metrics.extract.return_value = (MagicMock() for _ in range(2)) @@ -635,7 +591,7 @@ def test_reconcile_ignores_metrics_for_openstack_online_runners( ] } openstack_manager_for_reconcile._get_openstack_runner_status = MagicMock( - return_value=RunnerByHealth( + return_value=RunnerNameByHealth( healthy=(runner_names["healthy_online"], runner_names["healthy_offline"]), unhealthy=( runner_names["unhealthy_online"], @@ -692,7 +648,7 @@ def test_reconcile_ignores_metrics_for_openstack_online_runners( openstack_manager.runner_metrics.extract.assert_called_once_with( metrics_storage_manager=metrics.storage, - ignore_runners=set(openstack_online_runner_names), + runners=set(openstack_online_runner_names), ) @@ -862,7 +818,7 @@ def test__ssh_health_check_error(monkeypatch: pytest.MonkeyPatch, mock_server: M mock_ssh_connection = MagicMock() mock_ssh_connection.run = MagicMock(side_effect=TimeoutError) monkeypatch.setattr( - openstack_manager, "SshConnection", MagicMock(return_value=mock_ssh_connection) + openstack_manager, "SSHConnection", MagicMock(return_value=mock_ssh_connection) ) with pytest.raises(openstack_manager._SSHError) as exc: @@ -1132,7 +1088,7 @@ def test__get_ssh_connection_server_no_valid_connections( mock_ssh_connection = MagicMock() mock_ssh_connection.run = run monkeypatch.setattr( - openstack_manager, "SshConnection", MagicMock(return_value=mock_ssh_connection) + openstack_manager, "SSHConnection", MagicMock(return_value=mock_ssh_connection) ) with pytest.raises(openstack_manager._SSHError) as exc: @@ -1164,7 +1120,7 @@ def test__get_ssh_connection_server(monkeypatch: pytest.MonkeyPatch): return_value=factories.MockSSHRunResult(exited=0, stdout="hello world") ) monkeypatch.setattr( - openstack_manager, "SshConnection", MagicMock(return_value=mock_ssh_connection) + openstack_manager, "SSHConnection", MagicMock(return_value=mock_ssh_connection) ) assert ( diff --git a/tests/unit/test_runner_manager.py b/tests/unit/test_runner_manager.py index 94d3373d4..66b09cd60 100644 --- a/tests/unit/test_runner_manager.py +++ b/tests/unit/test_runner_manager.py @@ -29,7 +29,7 @@ from metrics.storage import MetricsStorage from runner import Runner, RunnerStatus from runner_manager import BUILD_IMAGE_SCRIPT_FILENAME, RunnerManager, RunnerManagerConfig -from runner_type import RunnerByHealth +from runner_type import RunnerNameByHealth from tests.unit.mock import TEST_BINARY, MockLxdImageManager FAKE_MONGODB_URI = "mongodb://example.com/db" @@ -268,7 +268,7 @@ def mock_get_runners(): # Create online runners. runner_manager._get_runners = mock_get_runners - runner_manager._get_runner_health_states = lambda: RunnerByHealth( + runner_manager._get_runner_health_states = lambda: RunnerNameByHealth( ( f"{runner_manager.instance_name}-0", f"{runner_manager.instance_name}-1", @@ -433,7 +433,7 @@ def mock_get_runners(): # Create online runners. runner_manager._get_runners = mock_get_runners - runner_manager._get_runner_health_states = lambda: RunnerByHealth( + runner_manager._get_runner_health_states = lambda: RunnerNameByHealth( healthy=( online_idle_runner_name, offline_idle_runner_name,