Skip to content

Commit

Permalink
Add migration to fix database edges state
Browse files Browse the repository at this point in the history
  • Loading branch information
LucasG0 committed Feb 11, 2025
1 parent 38f593f commit 7828def
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 19 deletions.
2 changes: 1 addition & 1 deletion backend/infrahub/core/graph/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
GRAPH_VERSION = 18
GRAPH_VERSION = 19
2 changes: 2 additions & 0 deletions backend/infrahub/core/migrations/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .m016_diff_delete_bug_fix import Migration016
from .m017_add_core_profile import Migration017
from .m018_uniqueness_nulls import Migration018
from .m019_restore_rels_to_time import Migration019

if TYPE_CHECKING:
from infrahub.core.root import Root
Expand All @@ -45,6 +46,7 @@
Migration016,
Migration017,
Migration018,
Migration019,
]


Expand Down
153 changes: 153 additions & 0 deletions backend/infrahub/core/migrations/graph/m019_restore_rels_to_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Sequence

from infrahub.core.migrations.shared import GraphMigration, MigrationResult
from infrahub.log import get_logger

from ...constants import GLOBAL_BRANCH_NAME
from ...query import Query, QueryType

if TYPE_CHECKING:
from infrahub.database import InfrahubDatabase

log = get_logger()


class FixBranchAwareEdgesQuery(Query):
name = "replace_global_edges"
type = QueryType.WRITE
insert_return = False

async def query_init(self, db: InfrahubDatabase, **kwargs: dict[str, Any]) -> None:
"""
Between a Node and a Relationship, if Relationship.branch_support=aware, replace any global edge
to the branch of a non-global edge leaving out of the Relationship node. Note that there can't
be multiple non-global branches on these edges, as a dedicated Relationship node would exist for that.
"""

query = """
MATCH (node:Node)-[global_edge:IS_RELATED {branch: $global_branch}]-(rel:Relationship)
MATCH (rel)-[non_global_edge:IS_RELATED]-(node_2: Node)
WHERE non_global_edge.branch <> $global_branch
SET global_edge.branch = non_global_edge.branch
"""

params = {"global_branch": GLOBAL_BRANCH_NAME}
self.params.update(params)
self.add_to_query(query)


class SetMissingToTimeQuery(Query):
name = "set_missing_to_time"
type = QueryType.WRITE
insert_return = False

async def query_init(self, db: InfrahubDatabase, **kwargs: dict[str, Any]) -> None:
"""
If both a deleted edge and an active edge with no time exist between 2 nodes on the same branch,
set `to` time of active edge using `from` time of the deleted one. This would typically happen after having
replaced a deleted edge on global branch by correct branch with above query.
"""

query = """
MATCH (node:Node)-[deleted_edge:IS_RELATED {status: "deleted"}]-(rel:Relationship)
MATCH (rel)-[active_edge:IS_RELATED {status: "active"}]-()
WHERE active_edge.to IS NULL AND deleted_edge.branch = active_edge.branch
SET active_edge.to = deleted_edge.from
"""

params = {"global_branch": GLOBAL_BRANCH_NAME}
self.params.update(params)
self.add_to_query(query)


class DeleteNodesRelsQuery(Query):
name = "delete_relationships_of_deleted_nodes"
type = QueryType.WRITE
insert_return = False

async def query_init(self, db: InfrahubDatabase, **kwargs: dict[str, Any]) -> None:
"""
Some nodes may have been deleted while having corrupted state that are fixes by above migrations.
While these nodes edges connected to Root are correctly deleted,
edges connected to other `Node` through a `Relationship` node may still be active.
Following query correctly deletes these edges by both setting correct to time and creating corresponding deleted edge.
"""

query = """
MATCH (deleted_node: CoreStandardGroup)-[deleted_edge:IS_PART_OF {status: "deleted"}]->(:Root)
MATCH (deleted_node)-[:IS_RELATED]-(rel:Relationship)
// Set to time if there is an active edge on deleted edge branch
OPTIONAL MATCH (rel)-[peer_active_edge:IS_RELATED {status: "active"}]-(peer_1: Node)
WHERE peer_active_edge.branch = deleted_edge.branch AND peer_active_edge.to IS NULL
SET peer_active_edge.to = deleted_edge.from
// Check if deleted edge exists on this branch between Relationship and any peer_2 Node connected. Create it if it doesn't.
WITH deleted_edge.branch AS branch, deleted_edge.branch_level AS branch_level, deleted_edge.from as deleted_time, rel
MATCH (rel)-[:IS_RELATED]-(peer_2:Node)
CALL {
WITH rel, peer_2, branch
OPTIONAL MATCH (rel)-[r:IS_RELATED {branch: branch}]-(peer_2)
WHERE r.status = "deleted"
RETURN r IS NOT NULL AS has_deleted_edge
}
// The branch on which `deleted` edge might be created depends on Relationship.branch_support
WITH branch, branch_level, deleted_time, rel, has_deleted_edge, peer_2
WHERE has_deleted_edge = FALSE // only look at rel-peer_2 couples not having a deleted edge
OPTIONAL MATCH (rel)-[active_edge:IS_RELATED {status: "active"}]-(peer_3: Node)
WHERE active_edge.branch IS NOT NULL
WITH rel, active_edge, peer_3,
CASE
WHEN rel.branch_support = "agnostic" THEN $global_branch
WHEN rel.branch_support = "aware" THEN COALESCE(active_edge.branch, NULL)
ELSE NULL // Ending up here means there is no active branch between rel its peer Node,
// so there must be a deleted edge already, and thus we will not create one.
END AS branch,
branch_level,
deleted_time,
peer_2
// Need 2 calls to create the edge in the correct direction. Also note that MERGE ensures we do not create multiple times.
CALL {
WITH rel, peer_2, branch, branch_level, deleted_time
MATCH (rel)-[:IS_RELATED]->(peer_2)
MERGE (rel)-[:IS_RELATED {status: "deleted", branch: branch, branch_level: branch_level, from: deleted_time}]->(peer_2)
}
CALL {
WITH rel, peer_2, branch, branch_level, deleted_time
MATCH (rel)<-[:IS_RELATED]-(peer_2)
MERGE (rel)<-[:IS_RELATED {status: "deleted", branch: branch, branch_level: branch_level, from: deleted_time}]-(peer_2)
}
"""

params = {"global_branch": GLOBAL_BRANCH_NAME}
self.params.update(params)
self.add_to_query(query)


class Migration019(GraphMigration):
"""
Fix corrupted state introduced by Migration012 when duplicating a CoreAccount (branch Aware)
being part of a CoreStandardGroup (branch Agnostic). Database is corrupted at multiple points:
- Old CoreAccount node <> group_member node `active` edge has no `to` time (possibly because of #5590).
- Old CoreAccount node <> group_member node `deleted` edge is on `-global-` branch instead of `main`.
- New CoreAccount node <> group_member node `active` edge is on `-global-` branch instead of `main`.
Also, users having deleted corresponding CoreStandardGroup will also have the following data corruption,
as deletion did not happen correctly due to above issues:
- Both CoreAccount <> group_member and CoreStandardGroup <> group_member edges
have not been deleted (ie status is `active` without `to` time and no additional `deleted` edge).
This migration fixes all above issues to have consistent edges, and fixes IFC-1204.
"""

name: str = "019_fix_edges_state"
minimum_version: int = 18
queries: Sequence[type[Query]] = [FixBranchAwareEdgesQuery, SetMissingToTimeQuery, DeleteNodesRelsQuery]

async def validate_migration(self, db: InfrahubDatabase) -> MigrationResult:
result = MigrationResult()
return result
13 changes: 7 additions & 6 deletions backend/infrahub/core/migrations/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ async def execute(self, db: InfrahubDatabase) -> MigrationResult:
result = MigrationResult()

for migration_query in self.queries:
try:
query = await migration_query.init(db=ts)
await query.execute(db=ts)
except Exception as exc: # pylint: disable=broad-exception-caught
result.errors.append(str(exc))
return result
# try:
query = await migration_query.init(db=ts)
print(f"{query.name=}")
await query.execute(db=ts)
# except Exception as exc: # pylint: disable=broad-exception-caught
# result.errors.append(str(exc))
# return result

return result

Expand Down
20 changes: 8 additions & 12 deletions backend/infrahub/core/relationship/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,8 @@ def get_branch_based_on_support_type(self) -> Branch:
"""If the attribute is branch aware, return the Branch object associated with this attribute
If the attribute is branch agnostic return the Global Branch
Note that if this relationship is Aware and source node is Agnostic, it will return -global- branch.
Returns:
Branch:
"""
Expand Down Expand Up @@ -959,7 +961,7 @@ async def _fetch_relationships(
self.has_fetched_relationships = True

for peer_id in details.peer_ids_present_local_only:
await self.remove(peer_id=peer_id, db=db)
await self.remove_locally(peer_id=peer_id, db=db)

async def get(self, db: InfrahubDatabase) -> Relationship | list[Relationship] | None:
rels = await self.get_relationships(db=db)
Expand Down Expand Up @@ -1077,22 +1079,17 @@ async def resolve(self, db: InfrahubDatabase) -> None:
for rel in self._relationships:
await rel.resolve(db=db)

async def remove(
async def remove_locally(
self,
peer_id: Union[str, UUID],
db: InfrahubDatabase,
update_db: bool = False,
) -> bool:
"""Remove a peer id from the local relationships list,
need to investigate if and when we should update the relationship in the database."""
"""Remove a peer id from the local relationships list"""

for idx, rel in enumerate(await self.get_relationships(db=db)):
if str(rel.peer_id) != str(peer_id):
continue

if update_db:
await rel.delete(db=db)

self._relationships.pop(idx)
return True

Expand All @@ -1109,14 +1106,13 @@ async def remove_in_db(

# - Update the existing relationship if we are on the same branch
rel_ids_per_branch = peer_data.rel_ids_per_branch()

# In which cases do we end up here and do not want to set `to` time?
if branch.name in rel_ids_per_branch:
await update_relationships_to([str(ri) for ri in rel_ids_per_branch[branch.name]], to=remove_at, db=db)

# - Create a new rel of type DELETED if the existing relationship is on a different branch
rel_branches: set[str] = set()
if peer_data.rels:
rel_branches = {r.branch for r in peer_data.rels}
if rel_branches == {peer_data.branch}:
if peer_data.rels and {r.branch for r in peer_data.rels} == {peer_data.branch}:
return

query = await RelationshipDataDeleteQuery.init(
Expand Down
1 change: 1 addition & 0 deletions backend/infrahub/core/schema/definitions/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@
"optional": True,
"identifier": "group_member",
"cardinality": "many",
"branch": BranchSupportType.AWARE,
},
{
"name": "subscribers",
Expand Down
2 changes: 2 additions & 0 deletions backend/infrahub/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ async def execute_query_with_metadata(
)

with QUERY_EXECUTION_METRICS.labels(**labels).time():
if name == "replace_global_edges":
print(f"{query=}")
response = await self.run_query(query=query, params=params, name=name)
if response is None:
return [], {}
Expand Down
99 changes: 99 additions & 0 deletions backend/tests/unit/core/migrations/graph/test_019.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from infrahub_sdk import InfrahubClient

from infrahub.core.constants import GLOBAL_BRANCH_NAME
from infrahub.core.migrations.graph import Migration019
from infrahub.core.node import Node
from infrahub.database import InfrahubDatabase
from tests.helpers.test_app import TestInfrahubApp


class TestMigration019(TestInfrahubApp):
async def test_migration_019(
self,
client: InfrahubClient,
db: InfrahubDatabase,
default_branch,
):
"""
Reproduce corrupted state introduced by migration 12, and apply the migration fixing it.
"""

test_group = await Node.init(db=db, schema="CoreStandardGroup")
await test_group.new(db=db, name="test_group")
await test_group.save(db=db)

core_acc = await Node.init(db=db, schema="CoreAccount")
await core_acc.new(db=db, name="core_acc", account_type="User", password="def", member_of_groups=[test_group])
await core_acc.save(db=db)

# Delete CoreStandardGroup. This should also (correctly) update rels to CoreGenericAccount and CoreAccount
# but we will override them afterward to reproduce corrupted state.
await test_group.delete(db=db)

async with db.start_session() as dbs:
async with dbs.start_transaction() as ts:
# Make relationship between CoreAccount <> group_member <> CoreStandardGroup active while it should have been deleted.
# and make the group_member <> CoreAccount edge part on global branch

query = """
MATCH (new_core_acc: CoreAccount)-[:HAS_ATTRIBUTE]->(:Attribute {name: "name"})-[:HAS_VALUE]->(:AttributeValue {value: "core_acc"})
MATCH (new_core_acc)-[r1:IS_RELATED]-(group_member: Relationship)-[r2:IS_RELATED]-(test_group: CoreStandardGroup)
MATCH (new_core_acc)-[active_r1]-(group_member)
MATCH (new_core_acc)-[deleted_r1]-(group_member)
MATCH (test_group)-[active_r2]-(group_member)
MATCH (test_group)-[deleted_r2]-(group_member)
WHERE active_r1.status = 'active' AND deleted_r1.status = 'deleted'
AND active_r2.status = 'active' AND deleted_r2.status = 'deleted'
DELETE deleted_r1
REMOVE active_r1.to
SET active_r1.branch = '-global'
DELETE deleted_r2
REMOVE active_r2.to
return new_core_acc, group_member, test_group
"""

await ts.execute_query(query=query, name="query_1")

# Create the old CoreAccount object - not inheriting from GenericAccount -
# sharing same attributes / relationships than above CoreAccount

query_2 = """
// Match the existing CoreAccount node with the specified attributes
MATCH (new_core_acc:CoreAccount)-[:HAS_ATTRIBUTE]->(:Attribute {name: "name"})-[:HAS_VALUE]->(:AttributeValue {value: "core_acc"})
// Create the new CoreAccount node with the same uuid and additional properties
CREATE (new_node:CoreAccount:LineageOwner:LineageSource:Node {uuid: new_core_acc.uuid,
branch_support: new_core_acc.branch_support, namespace: new_core_acc.namespace, kind: "CoreAccount"})
WITH new_node, new_core_acc
// Match the relationships of the existing CoreAccount node
MATCH (new_core_acc)-[r:IS_RELATED]->(group_member:Relationship {name: "group_member"})
// Create active branch with no to time on main branch
CREATE (new_node)-[:IS_RELATED {branch: "main", from: "2024-02-05T15:37:07.228145Z", status: "active"}]->(group_member)
// Create deleted branch with no to time on global branch
CREATE (new_node)-[:IS_RELATED {branch: $global_branch, from: r.from, status: "deleted"}]->(group_member)
// Return the new_node
RETURN new_node;
"""

await ts.execute_query(query=query_2, name="query_2", params={"global_branch": GLOBAL_BRANCH_NAME})

# Make sure migration executes without error, and that we can query accounts afterwards.
# Note generated corrupted state does not trigger IFC-1204 bug,
# but a manual test confirmed migration solves this issue.

migration = Migration019()
await migration.execute(db=db)
await migration.validate_migration(db=db)

# Trigger e2e path to query this account
core_acc = await client.get(kind="CoreAccount", id=core_acc.id, prefetch_relationships=True)
assert core_acc.name.value == "core_acc"

0 comments on commit 7828def

Please sign in to comment.