From 3cf22d1b40951afdac9b4976cfe6dceb9acd9a52 Mon Sep 17 00:00:00 2001 From: zslade Date: Fri, 27 Oct 2023 09:05:46 +0100 Subject: [PATCH 01/14] Added method for computing cluster metrics --- splink/linker.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/splink/linker.py b/splink/linker.py index cee98a959f..cc1b958b97 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -43,6 +43,7 @@ unlinkables_chart, waterfall_chart, ) +from .cluster_metrics import _size_density_sql from .cluster_studio import render_splink_cluster_studio_html from .comparison import Comparison from .comparison_level import ComparisonLevel @@ -2040,6 +2041,44 @@ def cluster_pairwise_predictions_at_threshold( return cc + def _compute_cluster_metrics( + self, + df_predict: SplinkDataFrame, + df_clustered: SplinkDataFrame, + threshold_match_probability: float = None, + ): + """Generates a table containing cluster metrics and returns a Splink dataframe + + Args: + df_predict (SplinkDataFrame): The results of `linker.predict()` + df_clustered (SplinkDataFrame): The outputs of + `linker.cluster_pairwise_predictions_at_threshold()` + threshold_match_probability (float): Filter the pairwise match predictions + to include only pairwise comparisons with a match_probability above this + threshold. + + Returns: + SplinkDataFrame: A SplinkDataFrame containing cluster IDs and selected cluster metrics + + """ + + # Get unique row id column name from settings + unique_id_col = self._settings_dict["unique_id_column_name"] + + sqls = _size_density_sql( + df_predict, + df_clustered, + threshold_match_probability, + _unique_row_id=unique_id_col, + ) + + for sql in sqls: + linker._enqueue_sql(sql["sql"], sql["output_table_name"]) + + df_cluster_metrics = linker._execute_sql_pipeline() + + return df_cluster_metrics + def profile_columns( self, column_expressions: str | list[str], top_n=10, bottom_n=10 ): From 4ead9e284826e92e7a4674d967dda4d74afc43c4 Mon Sep 17 00:00:00 2001 From: zslade Date: Fri, 27 Oct 2023 09:05:59 +0100 Subject: [PATCH 02/14] SQL for computing cluster size and density --- splink/cluster_metrics.py | 59 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 splink/cluster_metrics.py diff --git a/splink/cluster_metrics.py b/splink/cluster_metrics.py new file mode 100644 index 0000000000..0c2a576ee6 --- /dev/null +++ b/splink/cluster_metrics.py @@ -0,0 +1,59 @@ +def _size_density_sql( + df_predict, df_clustered, threshold_match_probability, _unique_row_id +): + """Generates sql for computing cluster size and density at a given threshold. + + Args: + df_predict (SplinkDataFrame): The results of `linker.predict()` + df_clustered (SplinkDataFrame): The outputs of + `linker.cluster_pairwise_predictions_at_threshold()` + threshold_match_probability (float): Filter the pairwise match + predictions to include only pairwise comparisons with a + match_probability above this threshold. + _unique_row_id (string): name of unique id column in settings dict + + Returns: + sql string for computing cluster size and density + """ + + # Get physical table names from Splink dataframes + edges_table = df_edges.physical_name + clusters_table = df_clusters.physical_name + + sqls = [] + sql = f""" + SELECT + {_unique_row_id}_l, + COUNT(*) AS count_edges + FROM {edges_table} + WHERE match_probability >= {cluster_threshold} + GROUP BY {_unique_row_id}_l + """ + + sql = {"sql": sql, "output_table_name": "__count_edges"} + sqls.append(sql) + + sql = f""" + SELECT + c.cluster_id, + count(*) AS n_nodes, + sum(e.count_edges) AS n_edges + FROM {clusters_table} AS c + LEFT JOIN __count_edges e ON c.{_unique_row_id} = e.{_unique_row_id}_l + GROUP BY c.cluster_id + """ + sql = {"sql": sql, "output_table_name": "__counts_per_cluster"} + sqls.append(sql) + + sql = """ + SELECT + cluster_id, + n_nodes, + n_edges, + (n_edges * 2)/(n_nodes * (n_nodes-1)) AS density + FROM __counts_per_cluster + """ + sql = {"sql": sql, "output_table_name": "__splink__cluster_metrics_clusters"} + sqls.append(sql) + + return sqls From 9fd5463e404c3d6a46371b3a6b4e23d0fc8a65cd Mon Sep 17 00:00:00 2001 From: zslade Date: Fri, 27 Oct 2023 09:58:50 +0100 Subject: [PATCH 03/14] linker -> self --- splink/linker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/splink/linker.py b/splink/linker.py index cc1b958b97..9792da3cdb 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -2073,9 +2073,9 @@ def _compute_cluster_metrics( ) for sql in sqls: - linker._enqueue_sql(sql["sql"], sql["output_table_name"]) + self._enqueue_sql(sql["sql"], sql["output_table_name"]) - df_cluster_metrics = linker._execute_sql_pipeline() + df_cluster_metrics = self._execute_sql_pipeline() return df_cluster_metrics From c2133efe7f95f78c8077e69dbe51d8ec8e74bb17 Mon Sep 17 00:00:00 2001 From: zslade Date: Fri, 27 Oct 2023 10:56:34 +0100 Subject: [PATCH 04/14] Corrected df names --- splink/cluster_metrics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/splink/cluster_metrics.py b/splink/cluster_metrics.py index 0c2a576ee6..2fadfde3b7 100644 --- a/splink/cluster_metrics.py +++ b/splink/cluster_metrics.py @@ -17,8 +17,8 @@ def _size_density_sql( """ # Get physical table names from Splink dataframes - edges_table = df_edges.physical_name - clusters_table = df_clusters.physical_name + df_predict = df_predict.physical_name + df_clustered = df_clustered.physical_name sqls = [] sql = f""" From 973c46260a2e42b354062886ca1bc42b6b6680a0 Mon Sep 17 00:00:00 2001 From: zslade Date: Fri, 27 Oct 2023 10:57:27 +0100 Subject: [PATCH 05/14] Corrected table names --- splink/cluster_metrics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/splink/cluster_metrics.py b/splink/cluster_metrics.py index 2fadfde3b7..263ecc0d4c 100644 --- a/splink/cluster_metrics.py +++ b/splink/cluster_metrics.py @@ -17,8 +17,8 @@ def _size_density_sql( """ # Get physical table names from Splink dataframes - df_predict = df_predict.physical_name - df_clustered = df_clustered.physical_name + edges_table = df_predict.physical_name + clusters_table = df_clustered.physical_name sqls = [] sql = f""" From cef9f30d856fa4eaad3adf1bdf5674c789100914 Mon Sep 17 00:00:00 2001 From: zslade Date: Fri, 27 Oct 2023 10:59:04 +0100 Subject: [PATCH 06/14] Corrected threshold argument --- splink/cluster_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splink/cluster_metrics.py b/splink/cluster_metrics.py index 263ecc0d4c..f4a0d4b7c8 100644 --- a/splink/cluster_metrics.py +++ b/splink/cluster_metrics.py @@ -26,7 +26,7 @@ def _size_density_sql( {_unique_row_id}_l, COUNT(*) AS count_edges FROM {edges_table} - WHERE match_probability >= {cluster_threshold} + WHERE match_probability >= {threshold_match_probability} GROUP BY {_unique_row_id}_l """ From 37d8583257f1b66a33ec395fab6e29d547315b43 Mon Sep 17 00:00:00 2001 From: zslade Date: Tue, 31 Oct 2023 16:46:29 +0000 Subject: [PATCH 07/14] Add test for cluster metrics --- tests/test_cluster_metrics.py | 49 +++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 tests/test_cluster_metrics.py diff --git a/tests/test_cluster_metrics.py b/tests/test_cluster_metrics.py new file mode 100644 index 0000000000..d746ea8947 --- /dev/null +++ b/tests/test_cluster_metrics.py @@ -0,0 +1,49 @@ +import pandas as pd +from pandas.testing import assert_frame_equal +from splink.duckdb.linker import DuckDBLinker + +# Dummy df +person_ids = [i + 1 for i in range(5)] +df = pd.DataFrame({"person_id": person_ids}) + +# Dummy edges df +edges_data = [ + # cluster A edges + {"person_id_l": 1, "person_id_r": 2, "match_probability": 0.99}, + {"person_id_l": 1, "person_id_r": 3, "match_probability": 0.99}, + # cluster B edge + {"person_id_l": 4, "person_id_r": 5, "match_probability": 0.99}, + # edges not in relevant clusters + {"person_id_l": 10, "person_id_r": 11, "match_probability": 0.99}, + {"person_id_l": 12, "person_id_r": 12, "match_probability": 0.95}, +] +edges = pd.DataFrame(edges_data) + +# Dummy clusters df +cluster_ids = ["A", "A", "A", "B", "B"] +clusters_data = {"cluster_id": cluster_ids, "person_id": person_ids} +clusters = pd.DataFrame(clusters_data) + +# Expected dataframe +expected_data = [ + {"cluster_id": "A", "n_nodes": 3, "n_edges": 2.0, "density": 2 / 3}, + {"cluster_id": "B", "n_nodes": 2, "n_edges": 1.0, "density": 1.0}, +] +df_expected = pd.DataFrame(expected_data) + + +def test_size_density(): + # Linker with basic settings + settings = {"link_type": "dedupe_only", "unique_id_column_name": "person_id"} + linker = DuckDBLinker(df, settings) + + # Register as Splink dataframes + df_predict = linker.register_table(edges, "df_predict", overwrite=True) + df_clustered = linker.register_table(clusters, "df_clustered", overwrite=True) + + df_cluster_metrics = linker._compute_cluster_metrics( + df_predict, df_clustered, threshold_match_probability=0.99 + ) + df_cluster_metrics = df_cluster_metrics.as_pandas_dataframe() + + assert_frame_equal(df_cluster_metrics, df_expected) From a4cf7c6f147cba69316890a6193a8aae8df8a2be Mon Sep 17 00:00:00 2001 From: zslade Date: Tue, 31 Oct 2023 17:00:07 +0000 Subject: [PATCH 08/14] Linted --- splink/linker.py | 3 ++- tests/test_cluster_metrics.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/splink/linker.py b/splink/linker.py index 9792da3cdb..ee485f8eb6 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -2058,7 +2058,8 @@ def _compute_cluster_metrics( threshold. Returns: - SplinkDataFrame: A SplinkDataFrame containing cluster IDs and selected cluster metrics + SplinkDataFrame: A SplinkDataFrame containing cluster IDs and selected + cluster metrics """ diff --git a/tests/test_cluster_metrics.py b/tests/test_cluster_metrics.py index d746ea8947..da2fe0f5fb 100644 --- a/tests/test_cluster_metrics.py +++ b/tests/test_cluster_metrics.py @@ -1,5 +1,6 @@ import pandas as pd from pandas.testing import assert_frame_equal + from splink.duckdb.linker import DuckDBLinker # Dummy df From 1f50b7ad45dc3d6012d35ab3518a208029a1373f Mon Sep 17 00:00:00 2001 From: zslade Date: Tue, 7 Nov 2023 13:38:44 +0000 Subject: [PATCH 09/14] Add InputColumn and update arg name --- splink/cluster_metrics.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/splink/cluster_metrics.py b/splink/cluster_metrics.py index f4a0d4b7c8..4b769e71de 100644 --- a/splink/cluster_metrics.py +++ b/splink/cluster_metrics.py @@ -1,5 +1,5 @@ def _size_density_sql( - df_predict, df_clustered, threshold_match_probability, _unique_row_id + df_predict, df_clustered, threshold_match_probability, _unique_id_col ): """Generates sql for computing cluster size and density at a given threshold. @@ -10,7 +10,7 @@ def _size_density_sql( threshold_match_probability (float): Filter the pairwise match predictions to include only pairwise comparisons with a match_probability above this threshold. - _unique_row_id (string): name of unique id column in settings dict + _unique_id_col (string): name of unique id column in settings dict Returns: sql string for computing cluster size and density @@ -20,14 +20,17 @@ def _size_density_sql( edges_table = df_predict.physical_name clusters_table = df_clustered.physical_name + input_col = InputColumn(_unique_id_col) + unique_id_col_l = input_col.name_l() + sqls = [] sql = f""" SELECT - {_unique_row_id}_l, + {unique_id_col_l}, COUNT(*) AS count_edges FROM {edges_table} WHERE match_probability >= {threshold_match_probability} - GROUP BY {_unique_row_id}_l + GROUP BY {unique_id_col_l} """ sql = {"sql": sql, "output_table_name": "__count_edges"} @@ -39,7 +42,7 @@ def _size_density_sql( count(*) AS n_nodes, sum(e.count_edges) AS n_edges FROM {clusters_table} AS c - LEFT JOIN __count_edges e ON c.{_unique_row_id} = e.{_unique_row_id}_l + LEFT JOIN __count_edges e ON c.{_unique_id_col} = e.{unique_id_col_l} GROUP BY c.cluster_id """ sql = {"sql": sql, "output_table_name": "__counts_per_cluster"} From fafcd14fe523872a305b037c6f5a2d7f8086e775 Mon Sep 17 00:00:00 2001 From: zslade Date: Tue, 7 Nov 2023 13:43:24 +0000 Subject: [PATCH 10/14] Updated arg value --- splink/linker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splink/linker.py b/splink/linker.py index ee485f8eb6..eea3a2e50b 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -2070,7 +2070,7 @@ def _compute_cluster_metrics( df_predict, df_clustered, threshold_match_probability, - _unique_row_id=unique_id_col, + _unique_id_col=unique_id_col, ) for sql in sqls: From 5831de025f859eeded88e1cf1b8075f6b64f1ce9 Mon Sep 17 00:00:00 2001 From: zslade Date: Tue, 7 Nov 2023 13:47:17 +0000 Subject: [PATCH 11/14] Added __splink__ to tables names --- splink/cluster_metrics.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/splink/cluster_metrics.py b/splink/cluster_metrics.py index 4b769e71de..c5811af45c 100644 --- a/splink/cluster_metrics.py +++ b/splink/cluster_metrics.py @@ -33,7 +33,7 @@ def _size_density_sql( GROUP BY {unique_id_col_l} """ - sql = {"sql": sql, "output_table_name": "__count_edges"} + sql = {"sql": sql, "output_table_name": "__splink__count_edges"} sqls.append(sql) sql = f""" @@ -42,10 +42,10 @@ def _size_density_sql( count(*) AS n_nodes, sum(e.count_edges) AS n_edges FROM {clusters_table} AS c - LEFT JOIN __count_edges e ON c.{_unique_id_col} = e.{unique_id_col_l} + LEFT JOIN __splink__count_edges e ON c.{_unique_id_col} = e.{unique_id_col_l} GROUP BY c.cluster_id """ - sql = {"sql": sql, "output_table_name": "__counts_per_cluster"} + sql = {"sql": sql, "output_table_name": "__splink__counts_per_cluster"} sqls.append(sql) sql = """ @@ -54,7 +54,7 @@ def _size_density_sql( n_nodes, n_edges, (n_edges * 2)/(n_nodes * (n_nodes-1)) AS density - FROM __counts_per_cluster + FROM __splink__counts_per_cluster """ sql = {"sql": sql, "output_table_name": "__splink__cluster_metrics_clusters"} sqls.append(sql) From 18de56419727d2f0268d52960b2d1ec519ebbf2e Mon Sep 17 00:00:00 2001 From: zslade Date: Tue, 7 Nov 2023 14:24:19 +0000 Subject: [PATCH 12/14] Imported InputColumn --- splink/cluster_metrics.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/splink/cluster_metrics.py b/splink/cluster_metrics.py index c5811af45c..a39314a43e 100644 --- a/splink/cluster_metrics.py +++ b/splink/cluster_metrics.py @@ -1,3 +1,5 @@ +from splink.input_column import InputColumn + def _size_density_sql( df_predict, df_clustered, threshold_match_probability, _unique_id_col ): From 533bd501f23a796942bbbdb663a43725f5384732 Mon Sep 17 00:00:00 2001 From: zslade Date: Tue, 7 Nov 2023 14:25:27 +0000 Subject: [PATCH 13/14] lint with black --- splink/cluster_metrics.py | 1 + 1 file changed, 1 insertion(+) diff --git a/splink/cluster_metrics.py b/splink/cluster_metrics.py index a39314a43e..d15977245a 100644 --- a/splink/cluster_metrics.py +++ b/splink/cluster_metrics.py @@ -1,5 +1,6 @@ from splink.input_column import InputColumn + def _size_density_sql( df_predict, df_clustered, threshold_match_probability, _unique_id_col ): From 7570b63a004cea9788080cc29d07f5515e6b58df Mon Sep 17 00:00:00 2001 From: Zoe Slade Date: Wed, 8 Nov 2023 13:53:43 +0000 Subject: [PATCH 14/14] Use settings obj instead of dict Co-authored-by: ADBond <48208438+ADBond@users.noreply.github.com> --- splink/linker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splink/linker.py b/splink/linker.py index eea3a2e50b..afd8fc87e0 100644 --- a/splink/linker.py +++ b/splink/linker.py @@ -2064,7 +2064,7 @@ def _compute_cluster_metrics( """ # Get unique row id column name from settings - unique_id_col = self._settings_dict["unique_id_column_name"] + unique_id_col = self._settings_obj._unique_id_column_name sqls = _size_density_sql( df_predict,