diff --git a/causy/causal_discovery/constraint/orientation_rules/pc.py b/causy/causal_discovery/constraint/orientation_rules/pc.py index 78e6a08..36adbaa 100644 --- a/causy/causal_discovery/constraint/orientation_rules/pc.py +++ b/causy/causal_discovery/constraint/orientation_rules/pc.py @@ -248,14 +248,14 @@ def process( # if one edge has an arrowhead at z, orient the other one pointing away from z. # It cannot be a collider because we have already oriented all unshielded triples that contain colliders. for z in potential_zs: - print(f"x: {x.name}, y: {y.name}, z: {z}") z = graph.nodes[z] + print(f"x: {x.name}, y: {y.name}, z: {z.name}") breakflag = False if graph.only_directed_edge_exists(x, z) and graph.undirected_edge_exists( z, y ): for node in graph.nodes: - if graph.only_directed_edge_exists(graph.nodes[node], y): + if graph.only_directed_edge_exists(graph.nodes[node], y) and not graph.edge_exists(graph.nodes[node], z): breakflag = True break if breakflag is True: diff --git a/tests/test_orientation_tests.py b/tests/test_orientation_tests.py index 515afc4..aaf7aa1 100644 --- a/tests/test_orientation_tests.py +++ b/tests/test_orientation_tests.py @@ -437,7 +437,7 @@ def test_non_collider_test(self): self.assertTrue(model.graph.only_directed_edge_exists(x, y)) self.assertTrue(model.graph.only_directed_edge_exists(y, z)) - def test_non_collider_test_auto_mpg_graph_after_collider_rule(self): + def test_non_collider_test_auto_mpg_graph_after_collider_rule_noncollider_test(self): pipeline = [NonColliderTest()] model = graph_model_factory( Algorithm( @@ -456,24 +456,29 @@ def test_non_collider_test_auto_mpg_graph_after_collider_rule(self): model.graph.add_edge(mpg, weight, {}) model.graph.add_edge(displacement, cylinders, {}) - model.graph.add_edge(acceleration, horsepower, {}) + model.graph.add_edge(horsepower, displacement, {}) + # collider acceleration -> horsepower <- weight model.graph.add_directed_edge(acceleration, horsepower, {}) - model.graph.add_directed_edge(mpg, horsepower, {}) model.graph.add_directed_edge(weight, horsepower, {}) + # collider mpg -> horsepower <- acceleration + model.graph.add_directed_edge(mpg, horsepower, {}) + # collider acceleration -> displacement <- weight model.graph.add_directed_edge(weight, displacement, {}) model.graph.add_directed_edge(acceleration, displacement, {}) model.execute_pipeline_steps() + # test NonColliderTest self.assertTrue(model.graph.edge_of_type_exists(displacement, cylinders, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(horsepower, displacement, DirectedEdge())) - def test_non_collider_test_auto_mpg_graph(self): - pipeline = [NonColliderTest()] + def test_non_collider_test_auto_mpg_graph_after_collider_rule_whole_loop(self): + pipeline = [*PC_ORIENTATION_RULES] model = graph_model_factory( Algorithm( pipeline_steps=pipeline, edge_types=[DirectedEdge(), UndirectedEdge()], - name="TestCollider", + name="TestLoopAutoMpgGraph", ) )() model.graph = GraphManager() @@ -483,19 +488,118 @@ def test_non_collider_test_auto_mpg_graph(self): cylinders = model.graph.add_node("cylinders", []) displacement = model.graph.add_node("displacement", []) weight = model.graph.add_node("weight", []) + model.graph.add_edge(mpg, weight, {}) - model.graph.add_edge(weight, displacement, {}) model.graph.add_edge(displacement, cylinders, {}) - model.graph.add_edge(weight, horsepower, {}) - model.graph.add_edge(acceleration, horsepower, {}) + model.graph.add_edge(horsepower, displacement, {}) + # collider acceleration -> horsepower <- weight model.graph.add_directed_edge(acceleration, horsepower, {}) - model.graph.add_directed_edge(horsepower, displacement, {}) + model.graph.add_directed_edge(weight, horsepower, {}) + # collider mpg -> horsepower <- acceleration model.graph.add_directed_edge(mpg, horsepower, {}) + # collider acceleration -> displacement <- weight + model.graph.add_directed_edge(weight, displacement, {}) + model.graph.add_directed_edge(acceleration, displacement, {}) model.execute_pipeline_steps() + # test NonColliderTest self.assertTrue(model.graph.edge_of_type_exists(displacement, cylinders, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(horsepower, displacement, DirectedEdge())) + + def test_only_non_collider_rule_on_loop_test_model(self): + pipeline = [NonColliderTest()] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestLoop", + ) + )() + model.graph = GraphManager() + x = model.graph.add_node("X", []) + y = model.graph.add_node("Y", []) + z = model.graph.add_node("Z", []) + w = model.graph.add_node("W", []) + model.graph.add_edge(z, w, {}) + model.graph.add_directed_edge(x, z, {}) + model.graph.add_directed_edge(y, z, {}) + model.execute_pipeline_steps() + self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge())) + def test_loop(self): + pipeline = [*PC_ORIENTATION_RULES] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestLoop", + ) + )() + model.graph = GraphManager() + x = model.graph.add_node("X", []) + y = model.graph.add_node("Y", []) + z = model.graph.add_node("Z", []) + w = model.graph.add_node("W", []) + model.graph.add_edge(z, w, {}) + model.graph.add_directed_edge(x, z, {}) + model.graph.add_directed_edge(y, z, {}) + model.execute_pipeline_steps() + self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge())) + + def test_loop_two_iterations(self): + pipeline = [*PC_ORIENTATION_RULES] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestLoop", + ) + )() + model.graph = GraphManager() + x = model.graph.add_node("X", []) + y = model.graph.add_node("Y", []) + z = model.graph.add_node("Z", []) + w = model.graph.add_node("W", []) + v = model.graph.add_node("V", []) + model.graph.add_edge(z, w, {}) + model.graph.add_edge(w, v, {}) + model.graph.add_directed_edge(x, z, {}) + model.graph.add_directed_edge(y, z, {}) + model.execute_pipeline_steps() + self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(w, v, DirectedEdge())) + + def test_only_noncollider_rule_on_loop_model_after_one_iteration(self): + pipeline = [NonColliderTest()] + model = graph_model_factory( + Algorithm( + pipeline_steps=pipeline, + edge_types=[DirectedEdge(), UndirectedEdge()], + name="TestLoop", + ) + )() + model.graph = GraphManager() + x = model.graph.add_node("X", []) + y = model.graph.add_node("Y", []) + z = model.graph.add_node("Z", []) + w = model.graph.add_node("W", []) + v = model.graph.add_node("V", []) + model.graph.add_edge(w, v, {}) + model.graph.add_directed_edge(z, w, {}) + model.graph.add_directed_edge(x, z, {}) + model.graph.add_directed_edge(y, z, {}) + model.execute_pipeline_steps() + self.assertTrue(model.graph.edge_of_type_exists(x, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(y, z, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(z, w, DirectedEdge())) + self.assertTrue(model.graph.edge_of_type_exists(w, v, DirectedEdge())) def test_non_collider_loop_auto_mpg_graph(self): pipeline = [*PC_ORIENTATION_RULES] model = graph_model_factory( diff --git a/tests/test_pc_e2e.py b/tests/test_pc_e2e.py index cba7aaf..a1cacc8 100644 --- a/tests/test_pc_e2e.py +++ b/tests/test_pc_e2e.py @@ -13,7 +13,7 @@ ComputeDirectEffectsInDAGsMultivariateRegression, ) from causy.common_pipeline_steps.calculation import CalculatePearsonCorrelations -from causy.edge_types import DirectedEdge +from causy.edge_types import DirectedEdge, UndirectedEdge from causy.generators import PairsWithNeighboursGenerator from causy.graph_model import graph_model_factory from causy.causal_discovery.constraint.independence_tests.common import ( @@ -108,11 +108,15 @@ def test_pc_e2e_auto_mpg(self): self.assertEqual(pc.graph.edge_exists("horsepower", "cylinders"), False) # directtions + self.assertEqual(pc.graph.edge_of_type_exists("mpg", "weight", UndirectedEdge()), True) self.assertEqual(pc.graph.edge_of_type_exists("weight", "horsepower", DirectedEdge()), True) - self.assertEqual(pc.graph.edge_of_type_exists("horsepower", "displacement", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("weight", "displacement", DirectedEdge()), True) self.assertEqual(pc.graph.edge_of_type_exists("mpg", "horsepower", DirectedEdge()), True) self.assertEqual(pc.graph.edge_of_type_exists("acceleration", "horsepower", DirectedEdge()), True) self.assertEqual(pc.graph.edge_of_type_exists("acceleration", "displacement", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("displacement", "cylinders", DirectedEdge()), True) + self.assertEqual(pc.graph.edge_of_type_exists("horsepower", "displacement", DirectedEdge()), True) +