Skip to content

Commit

Permalink
Migration of schema migrations and validations to prefect
Browse files Browse the repository at this point in the history
  • Loading branch information
dgarros committed Oct 2, 2024
1 parent 5ff89c0 commit 8b34cbe
Show file tree
Hide file tree
Showing 25 changed files with 606 additions and 148 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ coverage.xml
*.env
script.py
**/*.local.*
local/*
.vscode/settings.json
node_modules/*
development/docker-compose.override.yml
Expand Down
38 changes: 26 additions & 12 deletions backend/infrahub/api/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
from infrahub.api.exceptions import SchemaNotValidError
from infrahub.core import registry
from infrahub.core.branch import Branch # noqa: TCH001
from infrahub.core.migrations.schema.runner import schema_migrations_runner
from infrahub.core.migrations.schema.models import SchemaApplyMigrationData
from infrahub.core.models import SchemaBranchHash, SchemaDiff # noqa: TCH001
from infrahub.core.schema import GenericSchema, MainSchemaTypes, NodeSchema, ProfileSchema, SchemaRoot
from infrahub.core.schema_manager import SchemaBranch, SchemaNamespace, SchemaUpdateValidationResult # noqa: TCH001
from infrahub.core.validators.checker import schema_validators_checker
from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData
from infrahub.database import InfrahubDatabase # noqa: TCH001
from infrahub.exceptions import MigrationError
from infrahub.log import get_logger
from infrahub.message_bus import Meta, messages
from infrahub.services import services
from infrahub.types import ATTRIBUTE_PYTHON_TYPES
from infrahub.worker import WORKER_IDENTITY
from infrahub.workflows.catalogue import SCHEMA_APPLY_MIGRATION, SCHEMA_VALIDATE_MIGRATION

if TYPE_CHECKING:
from typing_extensions import Self
Expand Down Expand Up @@ -258,11 +259,16 @@ async def load_schema(
# ----------------------------------------------------------
# Validate if the new schema is valid with the content of the database
# ----------------------------------------------------------
error_messages, _ = await schema_validators_checker(
branch=branch, schema=candidate_schema, constraints=result.constraints, service=service
validate_migration_data = SchemaValidateMigrationData(
branch=branch,
schema_branch=candidate_schema,
constraints=result.constraints,
)
error_messages = await service.workflow.execute( # type: ignore[var-annotated]
workflow=SCHEMA_VALIDATE_MIGRATION, message=validate_migration_data
)
if error_messages:
raise SchemaNotValidError(message=",\n".join(error_messages))
if error_messages: # type: ignore[has-type]
raise SchemaNotValidError(message=",\n".join(error_messages)) # type: ignore[has-type]

# ----------------------------------------------------------
# Update the schema
Expand Down Expand Up @@ -293,15 +299,18 @@ async def load_schema(
# ----------------------------------------------------------
# Run the migrations
# ----------------------------------------------------------
error_messages = await schema_migrations_runner(
apply_migration_data = SchemaApplyMigrationData(
branch=branch,
new_schema=candidate_schema,
previous_schema=origin_schema,
migrations=result.migrations,
service=service,
)
if error_messages:
raise MigrationError(message=",\n".join(error_messages))
migration_error_msgs = await service.workflow.execute( # type: ignore[var-annotated]
workflow=SCHEMA_APPLY_MIGRATION, message=apply_migration_data
)

if migration_error_msgs:
raise MigrationError(message=",\n".join(migration_error_msgs))

if config.SETTINGS.broker.enable:
message = messages.EventSchemaUpdate(
Expand Down Expand Up @@ -339,8 +348,13 @@ async def check_schema(
# ----------------------------------------------------------
# Validate if the new schema is valid with the content of the database
# ----------------------------------------------------------
error_messages, _ = await schema_validators_checker(
branch=branch, schema=candidate_schema, constraints=result.constraints, service=service
validate_migration_data = SchemaValidateMigrationData(
branch=branch,
schema_branch=candidate_schema,
constraints=result.constraints,
)
error_messages = await service.workflow.execute( # type: ignore[var-annotated]
workflow=SCHEMA_VALIDATE_MIGRATION, message=validate_migration_data
)
if error_messages:
raise SchemaNotValidError(message=",\n".join(error_messages))
Expand Down
202 changes: 110 additions & 92 deletions backend/infrahub/cli/db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import importlib
import logging
import os
from enum import Enum
from typing import TYPE_CHECKING, Optional

import typer
from infrahub_sdk.async_typer import AsyncTyper
from prefect.testing.utilities import prefect_test_harness
from rich import print as rprint
from rich.console import Console
from rich.logging import RichHandler
Expand All @@ -18,16 +20,18 @@
from infrahub.core.graph.schema import GRAPH_SCHEMA
from infrahub.core.initialization import first_time_initialization, get_root_node, initialization, initialize_registry
from infrahub.core.migrations.graph import get_graph_migrations
from infrahub.core.migrations.schema.runner import schema_migrations_runner
from infrahub.core.migrations.schema.models import SchemaApplyMigrationData
from infrahub.core.schema import SchemaRoot, core_models, internal_schema
from infrahub.core.schema.definitions.deprecated import deprecated_models
from infrahub.core.schema_manager import SchemaManager
from infrahub.core.utils import delete_all_nodes
from infrahub.core.validators.checker import schema_validators_checker
from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData
from infrahub.database import DatabaseType
from infrahub.log import get_logger
from infrahub.services import InfrahubServices
from infrahub.services.adapters.message_bus.local import BusSimulator
from infrahub.services.adapters.workflow.local import WorkflowLocalExecution
from infrahub.workflows.catalogue import SCHEMA_APPLY_MIGRATION, SCHEMA_VALIDATE_MIGRATION

if TYPE_CHECKING:
from infrahub.cli.context import CliContext
Expand Down Expand Up @@ -177,6 +181,9 @@ async def update_core_schema( # pylint: disable=too-many-statements
"""Check the current format of the internal graph and apply the necessary migrations"""
logging.getLogger("infrahub").setLevel(logging.WARNING)
logging.getLogger("neo4j").setLevel(logging.ERROR)
logging.getLogger("prefect").setLevel(logging.ERROR)
os.environ["PREFECT_SERVER_ANALYTICS_ENABLED"] = "false"

config.load_and_exit(config_file_name=config_file)

context: CliContext = ctx.obj
Expand All @@ -185,98 +192,109 @@ async def update_core_schema( # pylint: disable=too-many-statements
error_badge = "[bold red]ERROR[/bold red]"

async with dbdriver.start_session() as db:
# ----------------------------------------------------------
# Initialize Schema and Registry
# ----------------------------------------------------------
service = InfrahubServices(database=db, message_bus=BusSimulator(database=db))
await initialize_registry(db=db)

default_branch = registry.get_branch_from_registry(branch=registry.default_branch)

registry.schema = SchemaManager()
schema = SchemaRoot(**internal_schema)
registry.schema.register_schema(schema=schema)

# ----------------------------------------------------------
# Load Current Schema from the database
# ----------------------------------------------------------
schema_default_branch = await registry.schema.load_schema_from_db(db=db, branch=default_branch)
registry.schema.set_schema_branch(name=default_branch.name, schema=schema_default_branch)
branch_schema = registry.schema.get_schema_branch(name=registry.default_branch)

candidate_schema = branch_schema.duplicate()
candidate_schema.load_schema(schema=SchemaRoot(**internal_schema))
candidate_schema.load_schema(schema=SchemaRoot(**core_models))
candidate_schema.load_schema(schema=SchemaRoot(**deprecated_models))
candidate_schema.process()

result = branch_schema.validate_update(other=candidate_schema, enforce_update_support=False)
if result.errors:
rprint(f"{error_badge} | Unable to update the schema, due to failed validations")
for error in result.errors:
rprint(error.to_string())
raise typer.Exit(1)

if not result.diff.all:
rprint("Core Schema Up to date, nothing to update")
raise typer.Exit(0)

rprint("Core Schema has diff, will need to be updated")
if debug:
result.diff.print()

# ----------------------------------------------------------
# Validate if the new schema is valid with the content of the database
# ----------------------------------------------------------
error_messages, _ = await schema_validators_checker(
branch=default_branch, schema=candidate_schema, constraints=result.constraints, service=service
)
if error_messages:
rprint(f"{error_badge} | Unable to update the schema, due to failed validations")
for message in error_messages:
rprint(message)
raise typer.Exit(1)

# ----------------------------------------------------------
# Update the schema
# ----------------------------------------------------------
origin_schema = branch_schema.duplicate()

# Update the internal schema
schema_default_branch.load_schema(schema=SchemaRoot(**internal_schema))
schema_default_branch.process()
registry.schema.set_schema_branch(name=default_branch.name, schema=schema_default_branch)

async with db.start_transaction() as dbt:
await registry.schema.update_schema_branch(
schema=candidate_schema,
db=dbt,
branch=default_branch.name,
diff=result.diff,
limit=result.diff.all,
update_db=True,
with prefect_test_harness():
# ----------------------------------------------------------
# Initialize Schema and Registry
# ----------------------------------------------------------
service = InfrahubServices(
database=db, message_bus=BusSimulator(database=db), workflow=WorkflowLocalExecution()
)
default_branch.update_schema_hash()
rprint("The Core Schema has been updated")
await initialize_registry(db=db)

default_branch = registry.get_branch_from_registry(branch=registry.default_branch)

registry.schema = SchemaManager()
schema = SchemaRoot(**internal_schema)
registry.schema.register_schema(schema=schema)

# ----------------------------------------------------------
# Load Current Schema from the database
# ----------------------------------------------------------
schema_default_branch = await registry.schema.load_schema_from_db(db=db, branch=default_branch)
registry.schema.set_schema_branch(name=default_branch.name, schema=schema_default_branch)
branch_schema = registry.schema.get_schema_branch(name=registry.default_branch)

candidate_schema = branch_schema.duplicate()
candidate_schema.load_schema(schema=SchemaRoot(**internal_schema))
candidate_schema.load_schema(schema=SchemaRoot(**core_models))
candidate_schema.load_schema(schema=SchemaRoot(**deprecated_models))
candidate_schema.process()

result = branch_schema.validate_update(other=candidate_schema, enforce_update_support=False)
if result.errors:
rprint(f"{error_badge} | Unable to update the schema, due to failed validations")
for error in result.errors:
rprint(error.to_string())
raise typer.Exit(1)

if not result.diff.all:
rprint("Core Schema Up to date, nothing to update")
raise typer.Exit(0)

rprint("Core Schema has diff, will need to be updated")
if debug:
rprint(f"New schema hash: {default_branch.active_schema_hash.main}")
await default_branch.save(db=dbt)

# ----------------------------------------------------------
# Run the migrations
# ----------------------------------------------------------
error_messages = await schema_migrations_runner(
branch=default_branch,
new_schema=candidate_schema,
previous_schema=origin_schema,
migrations=result.migrations,
service=service,
)
if error_messages:
rprint(f"{error_badge} | Some error(s) happened while running the schema migrations")
for message in error_messages:
rprint(message)
raise typer.Exit(1)
result.diff.print()

# ----------------------------------------------------------
# Validate if the new schema is valid with the content of the database
# ----------------------------------------------------------
validate_migration_data = SchemaValidateMigrationData(
branch=default_branch,
schema_branch=candidate_schema,
constraints=result.constraints,
)
error_messages = await service.workflow.execute( # type: ignore[var-annotated]
workflow=SCHEMA_VALIDATE_MIGRATION, message=validate_migration_data
)
if error_messages:
rprint(f"{error_badge} | Unable to update the schema, due to failed validations")
for message in error_messages:
rprint(message)
raise typer.Exit(1)

# ----------------------------------------------------------
# Update the schema
# ----------------------------------------------------------
origin_schema = branch_schema.duplicate()

# Update the internal schema
schema_default_branch.load_schema(schema=SchemaRoot(**internal_schema))
schema_default_branch.process()
registry.schema.set_schema_branch(name=default_branch.name, schema=schema_default_branch)

async with db.start_transaction() as dbt:
await registry.schema.update_schema_branch(
schema=candidate_schema,
db=dbt,
branch=default_branch.name,
diff=result.diff,
limit=result.diff.all,
update_db=True,
)
default_branch.update_schema_hash()
rprint("The Core Schema has been updated")
if debug:
rprint(f"New schema hash: {default_branch.active_schema_hash.main}")
await default_branch.save(db=dbt)

# ----------------------------------------------------------
# Run the migrations
# ----------------------------------------------------------
apply_migration_data = SchemaApplyMigrationData(
branch=default_branch,
new_schema=candidate_schema,
previous_schema=origin_schema,
migrations=result.migrations,
)
migration_error_msgs = await service.workflow.execute( # type: ignore[var-annotated]
workflow=SCHEMA_APPLY_MIGRATION, message=apply_migration_data
)

if migration_error_msgs:
rprint(f"{error_badge} | Some error(s) happened while running the schema migrations")
for message in migration_error_msgs:
rprint(message)
raise typer.Exit(1)


@app.command()
Expand Down
15 changes: 15 additions & 0 deletions backend/infrahub/core/migrations/schema/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pydantic import BaseModel, ConfigDict

from infrahub.core.branch import Branch
from infrahub.core.models import SchemaUpdateMigrationInfo
from infrahub.core.schema_manager import SchemaBranch


class SchemaApplyMigrationData(BaseModel):
model_config = ConfigDict(
arbitrary_types_allowed=True, json_encoders={SchemaBranch: SchemaBranch.to_dict_schema_object}
)
branch: Branch
new_schema: SchemaBranch
previous_schema: SchemaBranch
migrations: list[SchemaUpdateMigrationInfo]
5 changes: 4 additions & 1 deletion backend/infrahub/core/migrations/schema/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import asyncio
from typing import TYPE_CHECKING, Optional

from infrahub.message_bus.messages.schema_migration_path import SchemaMigrationPath, SchemaMigrationPathResponse
from infrahub.message_bus.messages.schema_migration_path import (
SchemaMigrationPath,
SchemaMigrationPathResponse,
)

if TYPE_CHECKING:
from infrahub.core.branch import Branch
Expand Down
Loading

0 comments on commit 8b34cbe

Please sign in to comment.