Skip to content

Commit

Permalink
Add testcontainers packages and move testing to the SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
dgarros committed Dec 30, 2024
1 parent 106423a commit b23d472
Show file tree
Hide file tree
Showing 17 changed files with 817 additions and 256 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ sync/dist/
helm/charts/
helm/Chart.lock

python_testcontainers/dist/*
Empty file.
Empty file.
125 changes: 0 additions & 125 deletions backend/infrahub/testing/schemas/car_person.py

This file was deleted.

88 changes: 88 additions & 0 deletions backend/tests/integration_docker/test_propose_change_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from pathlib import Path

import pytest
from infrahub_sdk import InfrahubClient
from infrahub_sdk.protocols import CoreGenericRepository, CoreProposedChange
from infrahub_sdk.schema import NodeSchema, SchemaRoot
from infrahub_sdk.testing.docker import TestInfrahubDockerClient
from infrahub_sdk.testing.repository import GitRepo
from infrahub_sdk.testing.schemas.car_person import (
TESTING_PERSON,
SchemaCarPerson,
)

from infrahub.core.constants import InfrahubKind
from tests.helpers.fixtures import get_fixtures_dir

CURRENT_DIRECTORY = Path(__file__).parent.resolve()


class TestProposeChangeRepository(TestInfrahubDockerClient, SchemaCarPerson):
@pytest.fixture(scope="class")
def infrahub_version(self) -> str:
return "local"

@pytest.fixture(scope="class")
def schema_person_artifact(self, schema_person_base: NodeSchema) -> NodeSchema:
person_schema = schema_person_base.model_copy(deep=True)
person_schema.inherit_from = [InfrahubKind.ARTIFACTTARGET]
return person_schema

@pytest.fixture(scope="class")
def initial_schema(
self,
schema_car_base: NodeSchema,
schema_person_artifact: NodeSchema,
schema_manufacturer_base: NodeSchema,
) -> SchemaRoot:
return SchemaRoot(
version="1.0",
nodes=[schema_person_artifact, schema_car_base, schema_manufacturer_base],
)

async def test_load_initial_schema(
self, default_branch: str, client: InfrahubClient, initial_schema: SchemaRoot
) -> None:
await client.schema.wait_until_converged(branch=default_branch)

resp = await client.schema.load(
schemas=[initial_schema.to_schema_dict()], branch=default_branch, wait_until_converged=True
)
assert resp.errors == {}

async def test_load_initial_data(self, client: InfrahubClient, default_branch: str, remote_repos_dir: Path) -> None:
data = await self.create_initial_data(client=client, branch=default_branch)
persons = data[TESTING_PERSON]

# Create Group People
group_people = await client.create(
kind="CoreStandardGroup", name="people", members=[item.id for item in persons]
)
await group_people.save()

# Add repositories
fixture_dir = get_fixtures_dir()
repo_name = "car-dealership"
repo_dir = fixture_dir / "repos" / repo_name / "initial__main"
repo = GitRepo(name=repo_name, src_directory=repo_dir, dst_directory=remote_repos_dir)
await repo.add_to_infrahub(client=client)
in_sync = await repo.wait_for_sync_to_complete(client=client)
assert in_sync

repos = await client.all(kind=CoreGenericRepository)
assert repos

async def test_create_propose_change(self, client: InfrahubClient, default_branch: str) -> None:
branch = await client.branch.create(branch_name="branch2")
john = client.store.get_by_hfid(key=f"{TESTING_PERSON}__John Doe", raise_when_missing=True)

john_branch = await client.get(kind=TESTING_PERSON, id=john.id, branch=branch.name)
john_branch.description.value = "new description"
await john_branch.save()

pc = await client.create(
kind=CoreProposedChange, name="pc1", source_branch=branch.name, destination_branch=default_branch
)
await pc.save()

# breakpoint()
76 changes: 40 additions & 36 deletions backend/tests/integration_docker/test_schema_migration.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import copy
from pathlib import Path
from typing import Any

import pytest
import yaml
from infrahub_sdk import InfrahubClient

from infrahub.testing.helpers import TestInfrahubDev
from infrahub.testing.schemas.car_person import (
from infrahub_sdk.schema.main import AttributeKind, AttributeSchema, NodeSchema, SchemaRoot
from infrahub_sdk.testing.docker import TestInfrahubDockerClient
from infrahub_sdk.testing.schemas.car_person import (
NAMESPACE,
TESTING_PERSON,
SchemaCarPerson,
Expand All @@ -16,73 +14,79 @@
CURRENT_DIRECTORY = Path(__file__).parent.resolve()


class TestSchemaMigrations(TestInfrahubDev, SchemaCarPerson):
class TestSchemaMigrations(TestInfrahubDockerClient, SchemaCarPerson):
@pytest.fixture(scope="class")
def infrahub_version(self) -> str:
return "local"

@pytest.fixture(scope="class")
def schema_person_mandatory_age(self, schema_person_base: dict[str, Any]) -> dict[str, Any]:
schema_person = copy.deepcopy(schema_person_base)
schema_person["attributes"].append({"name": "age", "kind": "Number", "optional": False, "default_value": 99})
def schema_person_mandatory_age(self, schema_person_base: NodeSchema) -> NodeSchema:
schema_person = schema_person_base.model_copy(deep=True)
schema_person.attributes.append(
AttributeSchema(name="age", kind=AttributeKind.NUMBER, optional=False, default_value=99)
)
return schema_person

@pytest.fixture(scope="class")
def schema_person_with_age(
self,
schema_car_base: dict[str, Any],
schema_person_mandatory_age: dict[str, Any],
schema_manufacturer_base: dict[str, Any],
) -> dict[str, Any]:
return {
"version": "1.0",
"nodes": [schema_person_mandatory_age, schema_car_base, schema_manufacturer_base],
}
schema_car_base: NodeSchema,
schema_person_mandatory_age: NodeSchema,
schema_manufacturer_base: NodeSchema,
) -> SchemaRoot:
return SchemaRoot(
version="1.0",
nodes=[schema_person_mandatory_age, schema_car_base, schema_manufacturer_base],
)

async def test_setup_initial_schema(
self, default_branch: str, infrahub_client: InfrahubClient, schema_base: dict[str, Any]
self, default_branch: str, client: InfrahubClient, schema_base: SchemaRoot
) -> None:
await infrahub_client.schema.wait_until_converged(branch=default_branch)
await client.schema.wait_until_converged(branch=default_branch)
# Validate that the schema is in sync after initial startup
assert await self.schema_in_sync(client=infrahub_client, branch=default_branch)
resp = await infrahub_client.schema.load(
schemas=[schema_base], branch=default_branch, wait_until_converged=True
assert await self.schema_in_sync(client=client, branch=default_branch)
resp = await client.schema.load(
schemas=[schema_base.to_schema_dict()], branch=default_branch, wait_until_converged=True
)
assert resp.errors == {}

await infrahub_client.schema.wait_until_converged(branch=default_branch)
await infrahub_client.schema.fetch(branch=default_branch, namespaces=[NAMESPACE])
_ = await infrahub_client.schema.get(kind=TESTING_PERSON, branch=default_branch)
await client.schema.wait_until_converged(branch=default_branch)
await client.schema.fetch(branch=default_branch, namespaces=[NAMESPACE])
_ = await client.schema.get(kind=TESTING_PERSON, branch=default_branch)

_ = await self.create_persons(client=infrahub_client, branch=default_branch)
_ = await self.create_manufacturers(client=infrahub_client, branch=default_branch)
_ = await self.create_persons(client=client, branch=default_branch)
_ = await self.create_manufacturers(client=client, branch=default_branch)

assert True

async def test_update_schema(self, infrahub_client: InfrahubClient, schema_person_with_age: dict[str, Any]) -> None:
branch = await infrahub_client.branch.create(branch_name="branch2")
resp = await infrahub_client.schema.load(schemas=[schema_person_with_age], branch=branch.name)
async def test_update_schema(self, client: InfrahubClient, schema_person_with_age: SchemaRoot) -> None:
branch = await client.branch.create(branch_name="branch2")
resp = await client.schema.load(schemas=[schema_person_with_age.to_schema_dict()], branch=branch.name)

assert resp.errors == {}

async def test_schema_load_and_delete(self, infrahub_client: InfrahubClient) -> None:
async def test_schema_load_and_delete(self, client: InfrahubClient) -> None:
with Path(CURRENT_DIRECTORY / "test_files/device_and_interface_schema.yml").open(encoding="utf-8") as file:
device_and_interface_schema = yaml.safe_load(file.read())

with Path(CURRENT_DIRECTORY / "test_files/delete_interface_schema.yml").open(encoding="utf-8") as file:
delete_interface_schema = yaml.safe_load(file.read())

device_branch = await infrahub_client.branch.create(branch_name="device_branch")
device_branch = await client.branch.create(branch_name="device_branch")

device_interface = await infrahub_client.schema.load(
device_interface = await client.schema.load(
schemas=[device_and_interface_schema], branch=device_branch.name, wait_until_converged=True
)
assert device_interface.schema_updated
# Validate that the schema is in sync after loading the device and interface schema
assert await self.schema_in_sync(client=infrahub_client, branch=device_branch.name)
assert await self.schema_in_sync(client=client, branch=device_branch.name)

delete_interface = await infrahub_client.schema.load(
delete_interface = await client.schema.load(
schemas=[delete_interface_schema], branch=device_branch.name, wait_until_converged=True
)
assert delete_interface.schema_updated
# Validate that the schema is in sync after removing the interface
assert await self.schema_in_sync(client=infrahub_client, branch=device_branch.name)
assert await self.schema_in_sync(client=client, branch=device_branch.name)

@staticmethod
async def schema_in_sync(client: InfrahubClient, branch: str | None) -> bool:
Expand Down
Loading

0 comments on commit b23d472

Please sign in to comment.