Skip to content

Commit

Permalink
Fix scenario tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sed-i committed Nov 12, 2024
1 parent 3daa286 commit 207bb92
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 83 deletions.
12 changes: 0 additions & 12 deletions tests/scenario/conftest.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,9 @@
import shutil
from pathlib import Path
from ops.testing import Context

from charm import GrafanaAgentK8sCharm

import pytest

CHARM_ROOT = Path(__file__).parent.parent.parent


@pytest.fixture
def vroot(tmp_path) -> Path:
root = Path(str(tmp_path.absolute()))
shutil.rmtree(root)
shutil.copytree(CHARM_ROOT / "src", root / "src")
return root


@pytest.fixture
def ctx():
Expand Down
13 changes: 4 additions & 9 deletions tests/scenario/test_dashboard_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json

from cosl import GrafanaDashboard
from scenario import Container, Context, Relation, State
from ops.testing import Container, Context, Relation, State

from charm import GrafanaAgentK8sCharm

Expand All @@ -12,7 +12,7 @@ def encode_as_dashboard(dct: dict):
return GrafanaDashboard._serialize(json.dumps(dct).encode("utf-8"))


def test_dashboard_propagation(vroot):
def test_dashboard_propagation(ctx):
# This test verifies that if the charm receives a dashboard via the requirer databag,
# it is correctly transferred to the provider databag.

Expand All @@ -29,23 +29,18 @@ def test_dashboard_propagation(vroot):
}
consumer = Relation(
"grafana-dashboards-consumer",
relation_id=1,
remote_app_data={"dashboards": json.dumps(data)},
)

provider = Relation("grafana-dashboards-provider", relation_id=2)
provider = Relation("grafana-dashboards-provider")

ctx = Context(charm_type=GrafanaAgentK8sCharm, charm_root=vroot)
state = State(
relations=[consumer, provider],
leader=True,
containers=[Container("agent", can_connect=True)],
)

with ctx.manager(
state=state,
event=consumer.changed_event,
) as mgr:
with ctx(ctx.on.relation_changed(consumer), state=state) as mgr:
dash = mgr.charm.dashboards[0]
assert dash["charm"] == expected["charm"]
assert dash["title"] == expected["title"]
Expand Down
28 changes: 8 additions & 20 deletions tests/scenario/test_setup_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,30 @@
# See LICENSE file for licensing details.

from ops import BlockedStatus, UnknownStatus, pebble
from scenario import Container, Context, ExecOutput, State
from ops.testing import Container, Context, State, Exec

import charm


def test_install(vroot):
context = Context(
charm.GrafanaAgentK8sCharm,
charm_root=vroot,
)
out = context.run("install", State())
def test_install(ctx):
out = ctx.run(ctx.on.install(), State())
assert out.unit_status == UnknownStatus()


def test_start(vroot):
context = Context(
charm.GrafanaAgentK8sCharm,
charm_root=vroot,
)
out = context.run("start", State())
def test_start(ctx):
out = ctx.run(ctx.on.start(), State())
assert out.unit_status == UnknownStatus()


def test_charm_start_with_container(vroot):
def test_charm_start_with_container(ctx):
agent = Container(
name="agent",
can_connect=True,
exec_mock={("/bin/agent", "-version"): ExecOutput(stdout="42.42")},
execs={Exec(["/bin/agent", "-version"], return_code=0, stdout="42.42")},
)

context = Context(
charm.GrafanaAgentK8sCharm,
charm_root=vroot,
)
state = State(containers=[agent])
out = context.run(agent.pebble_ready_event, state)
out = ctx.run(ctx.on.pebble_ready(agent), state)

assert out.unit_status == BlockedStatus(
"Missing incoming ('requires') relation: metrics-endpoint|logging-provider|tracing-provider|grafana-dashboards-consumer"
Expand Down
32 changes: 9 additions & 23 deletions tests/scenario/test_start_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
from pathlib import Path

from ops import pebble
from scenario import Container, Context, ExecOutput, State
from ops.testing import Container, Context, Exec, State, UnknownStatus

from charm import GrafanaAgentK8sCharm

CHARM_ROOT = Path(__file__).parent.parent.parent


@dataclasses.dataclass
class _MockProc:
Expand All @@ -21,36 +19,24 @@ def _subp_run_mock(*a, **kw):
return _MockProc(0)


def test_install(vroot):
ctx = Context(
charm_type=GrafanaAgentK8sCharm,
charm_root=vroot,
)
out = ctx.run(state=State(), event="install")
assert out.unit_status == ("unknown", "")
def test_install(ctx):
out = ctx.run(ctx.on.install(), state=State())
assert out.unit_status == UnknownStatus()


def test_start(vroot):
ctx = Context(
charm_type=GrafanaAgentK8sCharm,
charm_root=vroot,
)
out = ctx.run(state=State(), event="start")
def test_start(ctx):
out = ctx.run(ctx.on.start(), state=State())
assert out.unit_status.name == "unknown"


def test_charm_start_with_container(vroot):
def test_charm_start_with_container(ctx):
agent = Container(
name="agent",
can_connect=True,
exec_mock={("/bin/agent", "-version"): ExecOutput(stdout="42.42")},
execs={Exec(["/bin/agent", "-version"], return_code=0, stdout="42.42")},
)

ctx = Context(
charm_type=GrafanaAgentK8sCharm,
charm_root=vroot,
)
out = ctx.run(state=State(containers=[agent]), event=agent.pebble_ready_event)
out = ctx.run(ctx.on.pebble_ready(agent), state=State(containers=[agent]))

assert out.unit_status.name == "blocked"
agent_out = out.get_container("agent")
Expand Down
38 changes: 19 additions & 19 deletions tests/scenario/test_tracing_integration.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
from unittest.mock import patch

import pytest
import scenario
import yaml
from charms.tempo_k8s.v1.charm_tracing import charm_tracing_disabled
from charms.tempo_k8s.v2.tracing import Receiver, TracingProviderAppData, TracingRequirerAppData
from ops import pebble
from ops.testing import Context, State, Container, Relation

from charm import GrafanaAgentK8sCharm
from grafana_agent import CONFIG_PATH


@pytest.fixture
def ctx(vroot):
def ctx():
with charm_tracing_disabled():
with patch("socket.getfqdn", new=lambda: "localhost"):
yield scenario.Context(GrafanaAgentK8sCharm, charm_root=vroot)
yield Context(GrafanaAgentK8sCharm)


@pytest.fixture
def base_state():
yield scenario.State(
yield State(
leader=True,
containers=[
scenario.Container(
Container(
"agent",
can_connect=True,
# set it to inactive so we can detect when an event has caused it to start
Expand All @@ -35,14 +35,14 @@ def base_state():

def test_tracing_relation(ctx, base_state):
# GIVEN a tracing relation over the tracing-provider endpoint
tracing = scenario.Relation(
tracing = Relation(
"tracing-provider",
remote_app_data=TracingRequirerAppData(receivers=["otlp_http", "otlp_grpc"]).dump(),
)

state = base_state.replace(relations=[tracing])
# WHEN we process any setup event for the relation
state_out = ctx.run(tracing.changed_event, state)
state_out = ctx.run(ctx.on.relation_changed(tracing), state)

agent = state_out.get_container("agent")

Expand All @@ -58,11 +58,11 @@ def test_tracing_relation(ctx, base_state):

def test_tracing_relations_in_and_out(ctx, base_state):
# GIVEN a tracing relation over the tracing-provider endpoint and one over tracing
tracing_provider = scenario.Relation(
tracing_provider = Relation(
"tracing-provider",
remote_app_data=TracingRequirerAppData(receivers=["otlp_http", "otlp_grpc"]).dump(),
)
tracing = scenario.Relation(
tracing = Relation(
"tracing",
remote_app_data=TracingProviderAppData(
receivers=[
Expand All @@ -73,7 +73,7 @@ def test_tracing_relations_in_and_out(ctx, base_state):

state = base_state.replace(relations=[tracing, tracing_provider])
# WHEN we process any setup event for the relation
state_out = ctx.run(tracing.changed_event, state)
state_out = ctx.run(ctx.on.relation_changed(tracing), state)

agent = state_out.get_container("agent")

Expand All @@ -89,11 +89,11 @@ def test_tracing_relations_in_and_out(ctx, base_state):

def test_tracing_relation_passthrough(ctx, base_state):
# GIVEN a tracing relation over the tracing-provider endpoint and one over tracing
tracing_provider = scenario.Relation(
tracing_provider = Relation(
"tracing-provider",
remote_app_data=TracingRequirerAppData(receivers=["otlp_http", "otlp_grpc"]).dump(),
)
tracing = scenario.Relation(
tracing = Relation(
"tracing",
remote_app_data=TracingProviderAppData(
receivers=[
Expand All @@ -104,7 +104,7 @@ def test_tracing_relation_passthrough(ctx, base_state):

state = base_state.replace(relations=[tracing, tracing_provider])
# WHEN we process any setup event for the relation
state_out = ctx.run(tracing.changed_event, state)
state_out = ctx.run(ctx.on.relation_changed(tracing), state)

# THEN we act as a tracing provider for 'tracing-provider', and as requirer for 'tracing'
tracing_out = TracingRequirerAppData.load(state_out.get_relations("tracing")[0].local_app_data)
Expand Down Expand Up @@ -132,11 +132,11 @@ def test_tracing_relation_passthrough(ctx, base_state):
)
def test_tracing_relation_passthrough_with_force_enable(ctx, base_state, force_enable):
# GIVEN a tracing relation over the tracing-provider endpoint and one over tracing
tracing_provider = scenario.Relation(
tracing_provider = Relation(
"tracing-provider",
remote_app_data=TracingRequirerAppData(receivers=["otlp_http", "otlp_grpc"]).dump(),
)
tracing = scenario.Relation(
tracing = Relation(
"tracing",
remote_app_data=TracingProviderAppData(
receivers=[
Expand All @@ -151,7 +151,7 @@ def test_tracing_relation_passthrough_with_force_enable(ctx, base_state, force_e
relations=[tracing, tracing_provider],
)
# WHEN we process any setup event for the relation
state_out = ctx.run(tracing.changed_event, state)
state_out = ctx.run(ctx.on.relation_changed(tracing), state)

# THEN we act as a tracing provider for 'tracing-provider', and as requirer for 'tracing'
tracing_out = TracingRequirerAppData.load(state_out.get_relations("tracing")[0].local_app_data)
Expand Down Expand Up @@ -179,11 +179,11 @@ def test_tracing_relation_passthrough_with_force_enable(ctx, base_state, force_e
)
def test_tracing_sampling_config_is_present(ctx, base_state, sampling_config):
# GIVEN a tracing relation over the tracing-provider endpoint and one over tracing
tracing_provider = scenario.Relation(
tracing_provider = Relation(
"tracing-provider",
remote_app_data=TracingRequirerAppData(receivers=["otlp_http", "otlp_grpc"]).dump(),
)
tracing = scenario.Relation(
tracing = Relation(
"tracing",
remote_app_data=TracingProviderAppData(
receivers=[
Expand All @@ -194,7 +194,7 @@ def test_tracing_sampling_config_is_present(ctx, base_state, sampling_config):

state = base_state.replace(relations=[tracing, tracing_provider], config=sampling_config)
# WHEN we process any setup event for the relation
state_out = ctx.run(tracing.changed_event, state)
state_out = ctx.run(ctx.ok.relation_changed(tracing), state)

agent = state_out.get_container("agent")

Expand Down

0 comments on commit 207bb92

Please sign in to comment.