Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to do efficient blocking based on list/array intersections #1664

Closed
wants to merge 9 commits into from

Conversation

nerskin
Copy link
Contributor

@nerskin nerskin commented Oct 24, 2023

Type of PR

  • BUG
  • FEAT
  • MAINT
  • DOC

Is your Pull Request linked to an existing Issue or Pull Request?

This pull request follows on from the discussion under issue #1448 (sorry I took so long to send this PR in!)

Give a brief description for the solution you have provided

This adds the ability to provide blocking rules that compare a pair of records according to whether their array-valued columns intersect. I do this by unnesting/exploding the array-valued column(s) before joining the input tables, and then extracting the set of distinct pairs of ids which have at least one pair of rows which agree on the provided blocking rule.

For example, the rule

{"blocking_rule":"l.address_history = r.address_history and l.first_name = r.first_name",
"arrays_to_explode":["address_history"]}

will result in a pair of records being compared if their address histories overlap and their first names agree. It is also possible to unnest/explode more than one column at once in a blocking rule.

Pairs of ids returned by these blocking rules are stored with the BlockingRule object and filtered out from subsequent blocking rules.

PR Checklist

  • Added documentation for changes - not yet
  • Added feature to example notebooks or tutorial (if appropriate)
  • Added tests (if appropriate)
  • Made changes based off the latest version of Splink
  • Run the linter

@RobinL RobinL self-assigned this Oct 24, 2023
@RobinL
Copy link
Member

RobinL commented Oct 24, 2023

Thank you very much! I am off for the rest of this week but I will try and find time to look at this in depth next week.

Incidentally, we've been in contact with another big org in the UK Gov who has been struggling a bit with diffficult array problems (to explode or not to explode) and for whom I think this should be helpful!

Using this comments for a few notes as I look through this.

Testing script duckdb
import pandas as pd

import splink.duckdb.duckdb_comparison_library as cl
from splink.duckdb.duckdb_linker import DuckDBLinker

from splink.blocking import (
    block_using_rules_sql,
)

data = [
    {"unique_id": 1, "first_name": "John", "surname": "Doe", "postcode": ["A", "B"]},
    {"unique_id": 2, "first_name": "John", "surname": "Doe", "postcode": ["B"]},
    {"unique_id": 3, "first_name": "John", "surname": "Smith", "postcode": ["A"]},
]

df = pd.DataFrame(data)


settings = {
    "link_type": "dedupe_only",
    "blocking_rules_to_generate_predictions": [
        {
            "blocking_rule": "l.postcode = r.postcode and l.first_name = r.first_name",
            "arrays_to_explode": ["postcode"],
        },
        "l.surname = r.surname",
    ],
    "comparisons": [
        cl.exact_match("first_name"),
        cl.exact_match("surname"),
        cl.exact_match("postcode"),
    ],
    "retain_intermediate_calculation_columns": True,
}


linker = DuckDBLinker(df, settings)

concat_with_tf = linker._initialise_df_concat_with_tf()
sql = block_using_rules_sql(linker)
linker._enqueue_sql(sql, "__splink__df_blocked")
sql_gen = linker._pipeline._generate_pipeline([concat_with_tf])
print(sql_gen)

linker.predict().as_pandas_dataframe()
Testing script spark ```python import pandas as pd from pyspark.context import SparkConf, SparkContext from pyspark.sql import SparkSession

from splink.spark.comparison_library import exact_match
from splink.spark.linker import SparkLinker

conf = SparkConf()
conf.set("spark.driver.memory", "12g")
conf.set("spark.sql.shuffle.partitions", "2")

sc = SparkContext.getOrCreate(conf=conf)
sc.setCheckpointDir("tmp_checkpoints/")
spark = SparkSession(sc)

data = [
{"unique_id": 1, "first_name": "John", "surname": "Doe", "postcode": ["A", "B"]},
{"unique_id": 2, "first_name": "John", "surname": "Doe", "postcode": ["B"]},
{"unique_id": 3, "first_name": "John", "surname": "Smith", "postcode": ["A"]},
]

df = pd.DataFrame(data)
spark_df = spark.createDataFrame(df)

settings = {
"link_type": "dedupe_only",
"blocking_rules_to_generate_predictions": [
{
"blocking_rule": "l.postcode = r.postcode and l.first_name = r.first_name",
"arrays_to_explode": ["postcode"],
},
"l.surname = r.surname",
],
"comparisons": [
exact_match("first_name"),
exact_match("surname"),
exact_match("postcode"),
],
"retain_intermediate_calculation_columns": True,
}

linker = SparkLinker(spark_df, settings, input_table_aliases="fake_data_1")

linker.predict().as_pandas_dataframe()

</details>

@RobinL
Copy link
Member

RobinL commented Nov 1, 2023

@nerskin this is amazing!

I've run a a few examples, and looked through the resultant SQL. I'm happy with the implementation in terms of the SQL logic.

I want to take a look at whether I can refector the implementation of block_using_rules_sql and the execution flow to move the part that creates the ID tables (e.g.g ids_to_compare_blocking_rule_0_3057bc251) to a separate function. Probably something that executes in advance on the main blocking, so the ID tables are ready and waiting.

Just to try and keep a lid on the complexity of the block_using_rules_sql function.

I presume I'm right in saying it was a conscious decision on your part to materialise these tables for performance reasons? (That would make sense to me too)

if it's ok with you, I'll branch off your PR and see where I get to. If I manage to do something, I'll check back in with you before merging the changed version. If I hit a stumbling block, we can probably just go ahead and merge this as is

@nerskin
Copy link
Contributor Author

nerskin commented Nov 1, 2023

if it's ok with you, I'll branch off your PR and see where I get to. If I manage to do something, I'll check back in with you before merging the changed version. If I hit a stumbling block, we can probably just go ahead and merge this as is

Thanks @RobinL , that sounds good. I'm off work until the middle of next week, so I won't be able to look at it until I get back (although @aymonwuolanne might be able to help)

Yes, materializing the id tables was a conscious decision to try to improve performance.

@RobinL RobinL closed this Nov 1, 2023
@RobinL RobinL reopened this Nov 1, 2023
@RobinL
Copy link
Member

RobinL commented Nov 1, 2023

Oops, sorry, clicked close by accident

@RobinL
Copy link
Member

RobinL commented Nov 1, 2023

Been struggling with this today, not because of your code, but because the wider blocking.py is pretty messy (too many overrides/flags trying to accommodate too many situations). So sorry about the complexity here.

I do think i've found an issue with ambiguity over unique ids. I don't think it's too hard to fix, but here's a demo script to illustrrate

demo script
import pandas as pd
import sqlglot
from sqlglot.optimizer import optimize
import splink.duckdb.duckdb_comparison_library as cl
from splink.blocking import (
    block_using_rules_sql,
)
from splink.duckdb.blocking_rule_library import block_on
from splink.duckdb.linker import DuckDBLinker

data_1 = [
    {"unique_id": 1, "first_name": "John", "surname": "Doe", "postcode": ["A", "B"]},
    {"unique_id": 3, "first_name": "John", "surname": "Doe", "postcode": ["B"]},
]

data_2 = [
    {"unique_id": 3, "first_name": "John", "surname": "Smith", "postcode": ["A"]},
]

data_3 = [
    {"unique_id": 3, "first_name": "John", "surname": "Smith", "postcode": ["A"]},
]

df_1 = pd.DataFrame(data_1)
df_2 = pd.DataFrame(data_2)
df_3 = pd.DataFrame(data_3)


settings = {
    "link_type": "link_and_dedupe",
    "blocking_rules_to_generate_predictions": [
        {
            "blocking_rule": "l.postcode = r.postcode and l.first_name = r.first_name",
            "arrays_to_explode": ["postcode"],
        },
        "l.surname = r.surname",
    ],
    "comparisons": [
        cl.exact_match("first_name"),
        cl.exact_match("surname"),
        cl.exact_match("postcode"),
    ],
    "retain_intermediate_calculation_columns": True,
}


linker = DuckDBLinker([df_1, df_2, df_3], settings)
sdf = linker.predict()
linker.query_sql(f"""select * from {sdf.physical_name} 
                 where unique_id_l = 3 and unique_id_r = 3
                 order by source_dataset_l, source_dataset_r""")

The issue is that when you have multiple input dataframes, it doesn't suffice for the ids_to_compare to be a list of unique_id_l, unique_id_r pairs. Instead, the table needs both source_dataset and unique_id in it, or alternatively, they can be concatenated with this

I think the concatenation may be best in this case because it's easier to check for EXISTS on a single column

The underlying issue being that we only assume that unique_id s are unique within the dataframe, not between

One thing I wanted to check: Do we think this is only useful for blocking rules for prediction? That's been my assumption (that i don't see a scenario in which it should be used for EM training). Do you agree?

@RobinL
Copy link
Member

RobinL commented Nov 1, 2023

My overall view is that the interaction with the salting rules gets fiendishly complicated to the point where it's really hard to understand what's going on. (I'm impressed you managed to make it work actually!)

What your feeling about this? I wondered what you thought about preventing the user doing both salting AND exploding? Would that be a problem for your use case?

One other thing comes to mind re: simplification

Could you shunt all blocking rules that have 'arrays_to_explode' to the bottom of the list.

The logic for creating pairs wold then be simplified to:

  • Generate the ids_to_compare table(s) from all the arrays_to_explode blocking rules. It's easy to filter out the 'normal' blocking rules from this table during creation

  • Where there are several arrays_to_explode blocking rules, you could generate this with a DISTINCT rather than writing SQL logic that attempts to deduplicate. I.e. UNION ALL the ids_to_compare tables and then run DISTINCT.

(I'm not this would be significantly less efficient than the current AND NOT EXISTS logic)

  • Then, in generating the SQL for the full table of pariwise comparisons:
    • First generate SQL statements for all 'normal' blocking rules, included salted rules
    • Next generate a SQL statement for the remaining ids_to_compare
    • UNION ALL the above

@aymonwuolanne
Copy link
Contributor

Hi Robin,

Thanks for your comments. I think the unique id / source dataset problem should be a fairly easy fix.

I can't think of any reasons it shouldn't be used for EM estimation, but I guess it may be more efficient in that case to block on postcode[1] since it doesn't matter if we leave out some true links when estimating the parameters.

I think it's worth getting it working for both salting and exploding, our use case would probably be fine without salting but the solution as it is now seems to work ok with salting. Currently this works by creating an ids_to_compare_blocking_rule table for each salting partition, since the pipeline is executed inside of that loop, I can see a simplification where we UNION ALL these tables just after that loop, so then you'd only have one ids_to_compare_blocking_rule table for a given unnest blocking rule. I don't know that it's any more efficient, but it would reduce the number of tables that have to be created.

For your simplification suggestion, would the match_key column be a problem for running a SELECT DISTINCT over the UNION ALL in that scenario? e.g. unique_id_l = 1, unique_id_r = 2, match_key = 0 and unique_id_l = 1, unique_id_r = 2, match_key = 1 would be kept as separate rows.

@RobinL
Copy link
Member

RobinL commented Nov 2, 2023

Apologies - to clarify, other than the unique id issue, I haven't found any problems with the implementation. I agree salting is working correctly as is.

My concern at the moment is maintainability of the codebase.

One of the things the team is working on is tech debt and trying to make things simpler, and at the moment, I worry that when we come to try and refactor this code, it will be hard to get it right, because of its complexity.

I'll continue to do a few experiments. There's a slight refactor here, but I want to do a bit more work on it.

On the match key issue - yeah i agree it's a bit fiddly, but perhaps a

SELECT 
    uid_l, 
    uid_r, 
    MIN(match_key) as match_key
FROM 
    your_table_name
GROUP BY 
    uid_l, 
    uid_r;

could work.

For the moment I'll proceed on the basis that we'll try and leave the logic as is, but maybe move things about a bit and rename things to try and help future readers of the code understand it.

@aymonwuolanne
Copy link
Contributor

Absolutely, if things can be shuffled around to make it simpler and easier to maintain then that is definitely worth it.

@RobinL
Copy link
Member

RobinL commented Nov 15, 2023

Just to note we are making slow but steady progress with this.

We're doing quite a lot of refactoring at the moment, and are taking this as an opportunity to refactor blocking rules more generally i.e. get the baseline implementation right before adding this additional feature.

We've merged a couple of PRs already on this, so we're inching closer to being able to add this on top

@lamaeldo
Copy link

Just out of curiosity, i don't suppose there would be an efficient way to block on an array intersection specifying a specific size of the intersection, or ratio such as intersection/max(len(array_l), len(array_r))?

@RobinL
Copy link
Member

RobinL commented May 12, 2024

Not really, sorry

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants