Skip to content

Commit

Permalink
test: move imports helpers - charm_metrics_helpers - modules
Browse files Browse the repository at this point in the history
  • Loading branch information
yanksyoon committed Jan 19, 2024
1 parent c4e3d40 commit cb0b32b
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 140 deletions.
134 changes: 1 addition & 133 deletions tests/integration/charm_metrics_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,122 +6,24 @@

import json
import logging
import typing
from datetime import datetime, timezone
from time import sleep

import requests
from github.Branch import Branch
from github.Repository import Repository
from github.Workflow import Workflow
from github.WorkflowJob import WorkflowJob
from github.WorkflowRun import WorkflowRun
from juju.application import Application
from juju.unit import Unit

from github_type import JobConclusion
from metrics import METRICS_LOG_PATH
from runner_metrics import PostJobStatus
from tests.integration.helpers import get_runner_name, get_runner_names, run_in_unit
from tests.integration.helpers import JOB_LOG_START_MSG_TEMPLATE, get_runner_name, run_in_unit

TEST_WORKFLOW_NAMES = [
"Workflow Dispatch Tests",
"Workflow Dispatch Crash Tests",
"Workflow Dispatch Failure Tests 2a34f8b1-41e4-4bcb-9bbf-7a74e6c482f7",
]
JOB_LOG_START_MSG_TEMPLATE = "Job is about to start running on the runner: {runner_name}"


async def _wait_until_runner_is_used_up(runner_name: str, unit: Unit):
"""Wait until the runner is used up.
Args:
runner_name: The runner name to wait for.
unit: The unit which contains the runner.
"""
for _ in range(30):
runners = await get_runner_names(unit)
if runner_name not in runners:
break
sleep(30)
else:
assert False, "Timeout while waiting for the runner to be used up"


def _get_job_logs(job: WorkflowJob) -> str:
"""Retrieve a workflow's job logs.
Args:
job: The target job to fetch the logs from.
Returns:
The job logs.
"""
logs_url = job.logs_url()
logs = requests.get(logs_url).content.decode("utf-8")
return logs


def get_workflow_runs(
start_time: datetime, workflow: Workflow, runner_name: str, branch: Branch = None
) -> typing.Generator[WorkflowRun, None, None]:
"""Fetch the latest matching runs of a workflow for a given runner.
Args:
start_time: The start time of the workflow.
workflow: The target workflow to get the run for.
runner_name: The runner name the workflow job is assigned to.
branch: The branch the workflow is run on.
"""
for run in workflow.get_runs(created=f">={start_time.isoformat()}", branch=branch):
latest_job: WorkflowJob = run.jobs()[0]
logs = _get_job_logs(job=latest_job)

if JOB_LOG_START_MSG_TEMPLATE.format(runner_name=runner_name) in logs:
yield run


async def _assert_workflow_run_conclusion(
runner_name: str, conclusion: str, workflow: Workflow, start_time: datetime
):
"""Assert that the workflow run has the expected conclusion.
Args:
runner_name: The runner name to assert the workflow run conclusion for.
conclusion: The expected workflow run conclusion.
workflow: The workflow to assert the workflow run conclusion for.
start_time: The start time of the workflow.
"""
for run in workflow.get_runs(created=f">={start_time.isoformat()}"):
latest_job: WorkflowJob = run.jobs()[0]
logs = _get_job_logs(job=latest_job)

if JOB_LOG_START_MSG_TEMPLATE.format(runner_name=runner_name) in logs:
assert latest_job.conclusion == conclusion, (
f"Job {latest_job.name} for {runner_name} expected {conclusion}, "
f"got {latest_job.conclusion}"
)


async def _wait_for_workflow_to_complete(
unit: Unit, workflow: Workflow, conclusion: str, start_time: datetime
):
"""Wait for the workflow to complete.
Args:
unit: The unit which contains the runner.
workflow: The workflow to wait for.
conclusion: The workflow conclusion to wait for.
start_time: The start time of the workflow.
"""
runner_name = await get_runner_name(unit)
await _wait_until_runner_is_used_up(runner_name, unit)
# Wait for the workflow log to contain the conclusion
sleep(60)

await _assert_workflow_run_conclusion(
runner_name=runner_name, conclusion=conclusion, workflow=workflow, start_time=start_time
)


async def _wait_for_workflow_to_start(unit: Unit, workflow: Workflow):
Expand Down Expand Up @@ -222,40 +124,6 @@ async def _cancel_workflow_run(unit: Unit, workflow: Workflow):
run.cancel()


async def dispatch_workflow(
app: Application,
branch: Branch,
github_repository: Repository,
conclusion: str,
workflow_id_or_name: str,
):
"""Dispatch a workflow on a branch for the runner to run.
The function assumes that there is only one runner running in the unit.
Args:
app: The charm to dispatch the workflow for.
branch: The branch to dispatch the workflow on.
github_repository: The github repository to dispatch the workflow on.
conclusion: The expected workflow run conclusion.
workflow_id_or_name: The workflow filename in .github/workflows in main branch to run or
its id.
Returns:
A completed workflow.
"""
start_time = datetime.now(timezone.utc)

workflow = github_repository.get_workflow(id_or_file_name=workflow_id_or_name)

# The `create_dispatch` returns True on success.
assert workflow.create_dispatch(branch, {"runner": app.name})
await _wait_for_workflow_to_complete(
unit=app.units[0], workflow=workflow, conclusion=conclusion, start_time=start_time
)
return workflow


async def assert_events_after_reconciliation(
app: Application, github_repository: Repository, post_job_status: PostJobStatus
):
Expand Down
136 changes: 136 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@
import json
import subprocess
import time
import typing
from asyncio import sleep
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable, Union

import juju.version
import requests
import yaml
from github.Branch import Branch
from github.Repository import Repository
from github.Workflow import Workflow
from github.WorkflowJob import WorkflowJob
from github.WorkflowRun import WorkflowRun
from juju.application import Application
from juju.model import Model
from juju.unit import Unit
Expand All @@ -25,6 +33,8 @@
DISPATCH_CRASH_TEST_WORKFLOW_FILENAME = "workflow_dispatch_crash_test.yaml"
DISPATCH_FAILURE_TEST_WORKFLOW_FILENAME = "workflow_dispatch_failure_test.yaml"

JOB_LOG_START_MSG_TEMPLATE = "Job is about to start running on the runner: {runner_name}"


async def check_runner_binary_exists(unit: Unit) -> bool:
"""Checks if runner binary exists in the charm.
Expand Down Expand Up @@ -361,6 +371,132 @@ async def deploy_github_runner_charm(
return application


def _get_job_logs(job: WorkflowJob) -> str:
"""Retrieve a workflow's job logs.
Args:
job: The target job to fetch the logs from.
Returns:
The job logs.
"""
logs_url = job.logs_url()
logs = requests.get(logs_url).content.decode("utf-8")
return logs


def get_workflow_runs(
start_time: datetime, workflow: Workflow, runner_name: str, branch: Branch = None
) -> typing.Generator[WorkflowRun, None, None]:
"""Fetch the latest matching runs of a workflow for a given runner.
Args:
start_time: The start time of the workflow.
workflow: The target workflow to get the run for.
runner_name: The runner name the workflow job is assigned to.
branch: The branch the workflow is run on.
"""
for run in workflow.get_runs(created=f">={start_time.isoformat()}", branch=branch):
latest_job: WorkflowJob = run.jobs()[0]
logs = _get_job_logs(job=latest_job)

if JOB_LOG_START_MSG_TEMPLATE.format(runner_name=runner_name) in logs:
yield run


async def _wait_until_runner_is_used_up(runner_name: str, unit: Unit):
"""Wait until the runner is used up.
Args:
runner_name: The runner name to wait for.
unit: The unit which contains the runner.
"""
for _ in range(30):
runners = await get_runner_names(unit)
if runner_name not in runners:
break
await sleep(30)
else:
assert False, "Timeout while waiting for the runner to be used up"


async def _assert_workflow_run_conclusion(
runner_name: str, conclusion: str, workflow: Workflow, start_time: datetime
):
"""Assert that the workflow run has the expected conclusion.
Args:
runner_name: The runner name to assert the workflow run conclusion for.
conclusion: The expected workflow run conclusion.
workflow: The workflow to assert the workflow run conclusion for.
start_time: The start time of the workflow.
"""
for run in workflow.get_runs(created=f">={start_time.isoformat()}"):
latest_job: WorkflowJob = run.jobs()[0]
logs = _get_job_logs(job=latest_job)

if JOB_LOG_START_MSG_TEMPLATE.format(runner_name=runner_name) in logs:
assert latest_job.conclusion == conclusion, (
f"Job {latest_job.name} for {runner_name} expected {conclusion}, "
f"got {latest_job.conclusion}"
)


async def _wait_for_workflow_to_complete(
unit: Unit, workflow: Workflow, conclusion: str, start_time: datetime
):
"""Wait for the workflow to complete.
Args:
unit: The unit which contains the runner.
workflow: The workflow to wait for.
conclusion: The workflow conclusion to wait for.
start_time: The start time of the workflow.
"""
runner_name = await get_runner_name(unit)
await _wait_until_runner_is_used_up(runner_name, unit)
# Wait for the workflow log to contain the conclusion
await sleep(60)

await _assert_workflow_run_conclusion(
runner_name=runner_name, conclusion=conclusion, workflow=workflow, start_time=start_time
)


async def dispatch_workflow(
app: Application,
branch: Branch,
github_repository: Repository,
conclusion: str,
workflow_id_or_name: str,
):
"""Dispatch a workflow on a branch for the runner to run.
The function assumes that there is only one runner running in the unit.
Args:
app: The charm to dispatch the workflow for.
branch: The branch to dispatch the workflow on.
github_repository: The github repository to dispatch the workflow on.
conclusion: The expected workflow run conclusion.
workflow_id_or_name: The workflow filename in .github/workflows in main branch to run or
its id.
Returns:
A completed workflow.
"""
start_time = datetime.now(timezone.utc)

workflow = github_repository.get_workflow(id_or_file_name=workflow_id_or_name)

# The `create_dispatch` returns True on success.
assert workflow.create_dispatch(branch, {"runner": app.name})
await _wait_for_workflow_to_complete(
unit=app.units[0], workflow=workflow, conclusion=conclusion, start_time=start_time
)
return workflow


async def wait_for(
func: Callable[[], Union[Awaitable, Any]],
timeout: int = 300,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_charm_metrics_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
_wait_for_workflow_to_start,
assert_events_after_reconciliation,
clear_metrics_log,
dispatch_workflow,
print_loop_device_info,
wait_for_runner_to_be_marked_offline,
)
from tests.integration.helpers import (
DISPATCH_CRASH_TEST_WORKFLOW_FILENAME,
DISPATCH_FAILURE_TEST_WORKFLOW_FILENAME,
dispatch_workflow,
ensure_charm_has_runner,
get_runner_name,
reconcile,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_charm_metrics_success.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
from tests.integration.charm_metrics_helpers import (
assert_events_after_reconciliation,
clear_metrics_log,
dispatch_workflow,
get_metrics_log,
print_loop_device_info,
)
from tests.integration.helpers import (
DISPATCH_TEST_WORKFLOW_FILENAME,
dispatch_workflow,
ensure_charm_has_runner,
get_runner_name,
reconcile,
Expand Down
6 changes: 1 addition & 5 deletions tests/integration/test_debug_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
from github.WorkflowRun import WorkflowRun
from juju.application import Application

from tests.integration.charm_metrics_helpers import (
_get_job_logs,
dispatch_workflow,
get_workflow_runs,
)
from tests.integration.helpers import _get_job_logs, dispatch_workflow, get_workflow_runs

logger = logging.getLogger(__name__)

Expand Down

0 comments on commit cb0b32b

Please sign in to comment.