Skip to content

Commit

Permalink
feat: openstack flush mode (#344)
Browse files Browse the repository at this point in the history
* fix: flush idle by default

* tests: add unit & integration test

* chore: update logs

* fix: update deprecated charmcraft prime

* fix: use create_server_timeout constant

* test: swap test to reduce flakiness

* chore: add docstring for building status

* test: or condition (update returns bool)

* reconcile runners for next test

* wait for workflow complete & test flush

* fix lint
  • Loading branch information
yanksyoon authored Aug 5, 2024
1 parent 41d16a7 commit b70a535
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 46 deletions.
11 changes: 8 additions & 3 deletions charmcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ parts:
- libssl-dev # for cryptography
- rust-all # for cryptography
- pkg-config # for cryptography
scripts:
plugin: dump
source: scripts
organize:
build-lxd-image.sh: scripts/build-lxd-image.sh
reactive_runner.py: scripts/reactive_runner.py
repo_policy_compliance_service.py: scripts/repo_policy_compliance_service.py
prime:
- scripts/build-lxd-image.sh
- scripts/reactive_runner.py
- scripts/repo_policy_compliance_service.py
- scripts/
bases:
- build-on:
- name: "ubuntu"
Expand Down
86 changes: 73 additions & 13 deletions src/openstack_cloud/openstack_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import time
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from multiprocessing import Pool
from pathlib import Path
from typing import Iterable, Iterator, Literal, Optional, cast
Expand All @@ -32,7 +33,6 @@
import openstack.image.v2.image
import paramiko
from fabric import Connection as SshConnection
from invoke.runners import Result
from openstack.compute.v2.server import Server
from openstack.connection import Connection as OpenstackConnection
from openstack.exceptions import SDKException
Expand Down Expand Up @@ -61,7 +61,7 @@
from metrics.runner import RUNNER_INSTALLED_TS_FILE_NAME
from repo_policy_compliance_client import RepoPolicyComplianceClient
from runner_manager import IssuedMetricEventsStats
from runner_manager_type import OpenstackRunnerManagerConfig
from runner_manager_type import FlushMode, OpenstackRunnerManagerConfig
from runner_type import GithubPath, RunnerByHealth, RunnerGithubInfo
from utilities import retry, set_env_var

Expand Down Expand Up @@ -458,14 +458,19 @@ def _get_openstack_instances(self, conn: OpenstackConnection) -> list[Server]:
]

@staticmethod
def _health_check(conn: OpenstackConnection, server_name: str, startup: bool = False) -> bool:
def _health_check(
conn: OpenstackConnection,
server_name: str,
startup: bool = False,
) -> bool:
"""Health check a server instance.
A healthy server is defined as:
1. Openstack instance status is ACTIVE or BUILDING.
2. Runner.Worker exists (running a job).
3. Runner.Listener exists (waiting for job).
3. GitHub runner status is Idle or Active.
2. Openstack instance status is in BUILDING less than CREATE_SERVER_TIMEOUT seconds.
3. Runner.Worker exists (running a job).
4. Runner.Listener exists (waiting for job).
5. GitHub runner status is Idle or Active.
An undetermined server is marked as healthy when:
1. SSH fails - could be a transient network error.
Expand All @@ -487,6 +492,11 @@ def _health_check(conn: OpenstackConnection, server_name: str, startup: bool = F
return False
if server.status not in (_INSTANCE_STATUS_ACTIVE, _INSTANCE_STATUS_BUILDING):
return False
created_at = datetime.strptime(server.created_at, "%Y-%m-%dT%H:%M:%SZ")
current_time = datetime.now(created_at.tzinfo)
elapsed_min = (created_at - current_time).total_seconds()
if server.status == _INSTANCE_STATUS_BUILDING:
return elapsed_min < CREATE_SERVER_TIMEOUT
return OpenstackRunnerManager._ssh_health_check(
conn=conn, server_name=server_name, startup=startup
)
Expand Down Expand Up @@ -528,8 +538,7 @@ def _ssh_health_check(conn: OpenstackConnection, server_name: str, startup: bool
if RUNNER_WORKER_PROCESS in result.stdout or RUNNER_LISTENER_PROCESS in result.stdout:
return True

logger.error("[ALERT] Health check failed for server: %s", server_name)
return True
return False

@staticmethod
@retry(tries=3, delay=5, max_delay=60, backoff=2, local_logger=logger)
Expand Down Expand Up @@ -1189,7 +1198,7 @@ def _run_github_removal_script(
) from exc

try:
result: Result = ssh_conn.run(
result: invoke.runners.Result = ssh_conn.run(
f"{_CONFIG_SCRIPT_PATH} remove --token {remove_token}",
warn=True,
)
Expand Down Expand Up @@ -1509,20 +1518,71 @@ def _issue_reconciliation_metric(
except IssueMetricEventError:
logger.exception("Failed to issue Reconciliation metric")

def flush(self) -> int:
def flush(self, mode: FlushMode = FlushMode.FLUSH_IDLE) -> int:
"""Flush Openstack servers.
1. Kill the processes depending on flush mode.
2. Get unhealthy runners after process purging.
3. Delete unhealthy runners.
Args:
mode: The mode to determine which runner to flush.
Returns:
The number of runners flushed.
"""
logger.info("Flushing OpenStack all runners")
with _create_connection(self._cloud_config) as conn:
self._kill_runner_processes(conn=conn, mode=mode)
runner_by_health = self._get_openstack_runner_status(conn)
remove_token = self._github.get_runner_remove_token(path=self._config.path)
runners_to_delete = (*runner_by_health.healthy, *runner_by_health.unhealthy)
self._remove_runners(
conn=conn,
instance_names=runners_to_delete,
instance_names=runner_by_health.unhealthy,
remove_token=remove_token,
)
return len(runners_to_delete)
return len(runner_by_health.unhealthy)

def _kill_runner_processes(self, conn: OpenstackConnection, mode: FlushMode) -> None:
"""Kill runner application that are not running any jobs.
Runners that have not picked up a job has
1. no Runner.Worker process
2. no pre-run.sh job process
Args:
conn: The connection object to access OpenStack cloud.
mode: The flush mode to determine which runner processes to kill.
Raises:
NotImplementedError: If unsupported flush mode has been passed.
"""
killer_command: str
match mode:
case FlushMode.FLUSH_IDLE:
# only kill Runner.Listener if Runner.Worker does not exist.
killer_command = (
"! pgrep -x Runner.Worker && pgrep -x Runner.Listener && "
"kill $(pgrep -x Runner.Listener)"
)
case FlushMode.FLUSH_BUSY:
# kill both Runner.Listener and Runner.Worker processes.
# This kills pre-job.sh, a child process of Runner.Worker.
killer_command = (
"pgrep -x Runner.Listener && kill $(pgrep -x Runner.Listener);"
"pgrep -x Runner.Worker && kill $(pgrep -x Runner.Worker);"
)
case _:
raise NotImplementedError(f"Unsupported flush mode {mode}")

servers = self._get_openstack_instances(conn=conn)
for server in servers:
ssh_conn: SshConnection = self._get_ssh_connection(conn=conn, server_name=server.name)
result: invoke.runners.Result = ssh_conn.run(
killer_command,
warn=True,
)
if not result.ok:
logger.warning("Failed to kill runner process. Instance: %s", server.name)
continue
logger.info("Successfully killed runner process. Instance: %s", server.name)
4 changes: 2 additions & 2 deletions src/runner_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,11 @@ def flush(self, mode: FlushMode = FlushMode.FLUSH_IDLE) -> int:
)
)

if mode in {
if mode in (
FlushMode.FLUSH_BUSY_WAIT_REPO_CHECK,
FlushMode.FLUSH_BUSY,
FlushMode.FORCE_FLUSH_WAIT_REPO_CHECK,
}:
):
busy_runners = [runner for runner in self._get_runners() if runner.status.exist]

logger.info("Removing existing %i busy local runners", len(runners))
Expand Down
3 changes: 3 additions & 0 deletions src/runner_manager_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
class FlushMode(Enum):
"""Strategy for flushing runners.
During pre-job (repo-check), the runners are marked as idle and if the pre-job fails, the
runner falls back to being idle again. Hence wait_repo_check is required.
Attributes:
FLUSH_IDLE: Flush only idle runners.
FLUSH_IDLE_WAIT_REPO_CHECK: Flush only idle runners, then wait until repo-policy-check is
Expand Down
12 changes: 8 additions & 4 deletions tests/integration/helpers/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ async def dispatch_workflow(
conclusion: str,
workflow_id_or_name: str,
dispatch_input: dict | None = None,
):
wait: bool = True,
) -> WorkflowRun:
"""Dispatch a workflow on a branch for the runner to run.
The function assumes that there is only one runner running in the unit.
Expand All @@ -394,9 +395,10 @@ async def dispatch_workflow(
workflow_id_or_name: The workflow filename in .github/workflows in main branch to run or
its id.
dispatch_input: Workflow input values.
wait: Whether to wait for runner to run workflow until completion.
Returns:
A completed workflow.
The workflow run.
"""
start_time = datetime.now(timezone.utc)

Expand All @@ -413,14 +415,16 @@ async def dispatch_workflow(
timeout=10 * 60,
)
assert run, f"Run not found for workflow: {workflow.name} ({workflow.id})"
await wait_for(partial(_is_workflow_run_complete, run=run), timeout=60 * 30, check_interval=60)

if not wait:
return run
await wait_for(partial(_is_workflow_run_complete, run=run), timeout=60 * 30, check_interval=60)
# The run object is updated by _is_workflow_run_complete function above.
assert (
run.conclusion == conclusion
), f"Unexpected run conclusion, expected: {conclusion}, got: {run.conclusion}"

return workflow
return run


P = ParamSpec("P")
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_charm_base_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ async def test_runner_base_image(
assert "noble" in str(stdout)

# Workflow completes successfully
workflow = await dispatch_workflow(
workflow_run = await dispatch_workflow(
app=app_no_wait,
branch=test_github_branch,
github_repository=github_repository,
conclusion="success",
workflow_id_or_name=DISPATCH_E2E_TEST_RUN_WORKFLOW_FILENAME,
dispatch_input={"runner-tag": app_no_wait.name, "runner-virt-type": "lxd"},
)
await wait_for(lambda: workflow.get_runs()[0].status == "completed")
await wait_for(lambda: workflow_run.update() and workflow_run.status == "completed")
36 changes: 34 additions & 2 deletions tests/integration/test_charm_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest_asyncio
from github.Branch import Branch
from github.Repository import Repository
from juju.action import Action
from juju.application import Application
from juju.model import Model

Expand All @@ -20,8 +21,10 @@
from tests.integration.helpers import lxd
from tests.integration.helpers.common import (
DISPATCH_TEST_WORKFLOW_FILENAME,
DISPATCH_WAIT_TEST_WORKFLOW_FILENAME,
InstanceHelper,
dispatch_workflow,
wait_for,
)
from tests.integration.helpers.openstack import OpenStackInstanceHelper, setup_repo_policy

Expand Down Expand Up @@ -62,7 +65,10 @@ async def test_check_runner(app: Application) -> None:
@pytest.mark.asyncio
@pytest.mark.abort_on_fail
async def test_flush_runner_and_resource_config(
app: Application, instance_type: InstanceType
app: Application,
instance_type: InstanceType,
github_repository: Repository,
test_github_branch: Branch,
) -> None:
"""
arrange: A working application with one runner.
Expand All @@ -71,6 +77,7 @@ async def test_flush_runner_and_resource_config(
2. Nothing.
3. Change the virtual machine resource configuration.
4. Run flush_runner action.
5. Dispatch a workflow to make runner busy and call flush_runner action.
assert:
1. One runner exists.
Expand All @@ -79,13 +86,14 @@ async def test_flush_runner_and_resource_config(
4. a. The runner name should be different to the runner prior running
the action.
b. LXD profile matching virtual machine resources of step 2 exists.
5. The runner is not flushed since by default it flushes idle.
Test are combined to reduce number of runner spawned.
"""
unit = app.units[0]

# 1.
action = await app.units[0].run_action("check-runners")
action: Action = await app.units[0].run_action("check-runners")
await action.wait()

assert action.status == "completed"
Expand Down Expand Up @@ -129,6 +137,30 @@ async def test_flush_runner_and_resource_config(
assert len(new_runner_names) == 1
assert new_runner_names[0] != runner_names[0]

# 5.
workflow = await dispatch_workflow(
app=app,
branch=test_github_branch,
github_repository=github_repository,
conclusion="success",
workflow_id_or_name=DISPATCH_WAIT_TEST_WORKFLOW_FILENAME,
dispatch_input={"runner": app.name, "minutes": "5"},
wait=False,
)
await wait_for(lambda: workflow.update() or workflow.status == "in_progress")
action = await app.units[0].run_action("flush-runners")
await action.wait()

assert action.status == "completed"
assert action.results["delta"]["virtual-machines"] == "0"

await wait_for(lambda: workflow.update() or workflow.status == "completed")
action = await app.units[0].run_action("flush-runners")
await action.wait()

assert action.status == "completed"
assert action.results["delta"]["virtual-machines"] == "1"


@pytest.mark.openstack
@pytest.mark.asyncio
Expand Down
21 changes: 3 additions & 18 deletions tests/integration/test_debug_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@

"""Integration tests for github-runner charm with ssh-debug integration."""
import logging
from datetime import datetime, timedelta

from dateutil.tz import tzutc
from github.Branch import Branch
from github.Repository import Repository
from github.WorkflowRun import WorkflowRun
from juju.application import Application
from juju.model import Model

from charm_state import DENYLIST_CONFIG_NAME
from tests.integration.helpers.common import dispatch_workflow, get_job_logs, get_workflow_runs
from tests.integration.helpers.common import dispatch_workflow, get_job_logs
from tests.status_name import ACTIVE

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -48,29 +45,17 @@ async def test_ssh_debug(

# trigger tmate action
logger.info("Dispatching workflow_dispatch_ssh_debug.yaml workflow.")
start_time = datetime.now(tzutc())

# expect failure since the ssh workflow will timeout
workflow = await dispatch_workflow(
workflow_run = await dispatch_workflow(
app=app_no_wait,
branch=test_github_branch,
github_repository=github_repository,
conclusion="failure",
workflow_id_or_name=SSH_DEBUG_WORKFLOW_FILE_NAME,
)

# query a second before actual to ensure we query a bigger range. A branch is created per test
# module so this should only return a single result.
latest_run: WorkflowRun = next(
get_workflow_runs(
start_time=start_time - timedelta(seconds=1),
workflow=workflow,
runner_name=app_no_wait.name,
branch=test_github_branch,
)
)

logs = get_job_logs(latest_run.jobs("latest")[0])
logs = get_job_logs(workflow_run.jobs("latest")[0])

# ensure ssh connection info printed in logs.
logger.info("Logs: %s", logs)
Expand Down
Loading

0 comments on commit b70a535

Please sign in to comment.