Skip to content

Commit

Permalink
Add ANN algorithm for large number of records for similarity strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
mjawadtp committed Nov 5, 2024
1 parent ada400f commit 40097c1
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 4 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ default_language_version:
python: python3
repos:
- repo: https://github.com/ambv/black
rev: 22.3.0
rev: 24.10.0
hooks:
- id: black
- repo: https://github.com/pre-commit/pre-commit-hooks
Expand All @@ -18,12 +18,12 @@ repos:
- id: rst-linter
exclude: "docs"
- repo: https://github.com/pycqa/isort
rev: 5.12.0
rev: 5.13.2
hooks:
- id: isort
args: ["--profile", "black", "--filter-files"]
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v2.5.1
rev: v4.0.0-alpha.8
hooks:
- id: prettier
- repo: local
Expand Down
203 changes: 202 additions & 1 deletion cumulusci/tasks/bulkdata/select_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
import re
import typing as T

import numpy as np
import pandas as pd
from annoy import AnnoyIndex
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.preprocessing import StandardScaler

from cumulusci.core.enums import StrEnum
from cumulusci.tasks.bulkdata.extract_dataset_utils.hardcoded_default_declarations import (
DEFAULT_DECLARATIONS,
Expand Down Expand Up @@ -159,14 +165,115 @@ def similarity_generate_query(


def similarity_post_process(
load_records: list, query_records: list, sobject: str
load_records, query_records: list, sobject: str
) -> T.Tuple[T.List[dict], T.Union[str, None]]:
"""Processes the query results for the similarity selection strategy"""
# Handle case where query returns 0 records
if not query_records:
error_message = f"No records found for {sobject} in the target org."
return [], error_message

load_records = list(load_records)
load_record_count, query_record_count = len(load_records), len(query_records)

complexity_constant = load_record_count * query_record_count

print(complexity_constant)

closest_records = []

if complexity_constant < 1000:
closest_records = annoy_post_process(load_records, query_records)
else:
closest_records = levenshtein_post_process(load_records, query_records)

print(closest_records)

return closest_records


def annoy_post_process(
load_records: list, query_records: list
) -> T.Tuple[T.List[dict], T.Union[str, None]]:
"""Processes the query results for the similarity selection strategy using Annoy algorithm for large number of records"""

query_records = replace_empty_strings_with_missing(query_records)
load_records = replace_empty_strings_with_missing(load_records)

print("Query records: ")
print(query_records)

print("Load records: ")
print(load_records)

print("\n\n\n\n")

hash_features = 100
num_trees = 10

query_record_ids = [record[0] for record in query_records]
query_record_data = [record[1:] for record in query_records]

record_to_id_map = {
tuple(query_record_data[i]): query_record_ids[i]
for i in range(len(query_records))
}

final_load_vectors, final_query_vectors = vectorize_records(
load_records, query_record_data, hash_features=hash_features
)

# Create Annoy index for nearest neighbor search
vector_dimension = final_query_vectors.shape[1]
annoy_index = AnnoyIndex(vector_dimension, "euclidean")

for i in range(len(final_query_vectors)):
annoy_index.add_item(i, final_query_vectors[i])

# Build the index
annoy_index.build(num_trees)

# Find nearest neighbors for each query vector
n_neighbors = 1

closest_records = []

for i, load_vector in enumerate(final_load_vectors):
# Get nearest neighbors' indices and distances
nearest_neighbors = annoy_index.get_nns_by_vector(
load_vector, n_neighbors, include_distances=True
)
neighbor_indices = nearest_neighbors[0] # Indices of nearest neighbors
distances = nearest_neighbors[1] # Distances to nearest neighbors

load_record = load_records[i] # Get the query record for the current index
print(f"Load record {i + 1}: {load_record}\n") # Print the query record

# Print the nearest neighbors for the current query
print(f"Nearest neighbors for load record {i + 1}:")

for j, neighbor_index in enumerate(neighbor_indices):
# Retrieve the corresponding record from the database
record = query_record_data[neighbor_index]
distance = distances[j]

# Print the record and its distance
print(f" Neighbor {j + 1}: {record}, Distance: {distance:.6f}")
closest_record_id = record_to_id_map[tuple(record)]
print("Record id:" + closest_record_id)
closest_records.append(
{"id": closest_record_id, "success": True, "created": False}
)

print("\n") # Add a newline for better readability between query results

return closest_records, None


def levenshtein_post_process(
load_records: list, query_records: list
) -> T.Tuple[T.List[dict], T.Union[str, None]]:
"""Processes the query results for the similarity selection strategy using Levenshtein algorithm for small number of records"""
closest_records = []

for record in load_records:
Expand Down Expand Up @@ -300,3 +407,97 @@ def add_limit_offset_to_user_filter(
filter_clause += f" OFFSET {offset_clause}"

return f" {filter_clause}"


def determine_field_types(df):
numerical_features = []
boolean_features = []
categorical_features = []

for col in df.columns:
# Check if the column can be converted to numeric
try:
# Attempt to convert to numeric
df[col] = pd.to_numeric(df[col], errors="raise")
numerical_features.append(col)
except ValueError:
# Check for boolean values
if df[col].str.lower().isin(["true", "false"]).all():
# Map to actual boolean values
df[col] = df[col].str.lower().map({"true": True, "false": False})
boolean_features.append(col)
else:
categorical_features.append(col)

return numerical_features, boolean_features, categorical_features


def vectorize_records(db_records, query_records, hash_features):
# Convert database records and query records to DataFrames
df_db = pd.DataFrame(db_records)
df_query = pd.DataFrame(query_records)

# Dynamically determine field types
numerical_features, boolean_features, categorical_features = determine_field_types(
df_db
)

# Fit StandardScaler on the numerical features of the database records
scaler = StandardScaler()
if numerical_features:
df_db[numerical_features] = scaler.fit_transform(df_db[numerical_features])
df_query[numerical_features] = scaler.transform(df_query[numerical_features])

# Use HashingVectorizer to transform the categorical features
hashing_vectorizer = HashingVectorizer(
n_features=hash_features, alternate_sign=False
)

# For db_records
hashed_categorical_data_db = []
for col in categorical_features:
hashed_db = hashing_vectorizer.fit_transform(df_db[col]).toarray()
hashed_categorical_data_db.append(hashed_db)

# For query_records
hashed_categorical_data_query = []
for col in categorical_features:
hashed_query = hashing_vectorizer.transform(df_query[col]).toarray()
hashed_categorical_data_query.append(hashed_query)

# Combine all feature types into a single vector for the database records
db_vectors = []
if numerical_features:
db_vectors.append(df_db[numerical_features].values)
if boolean_features:
db_vectors.append(
df_db[boolean_features].astype(int).values
) # Convert boolean to int
if hashed_categorical_data_db:
db_vectors.append(np.hstack(hashed_categorical_data_db))

# Concatenate database vectors
final_db_vectors = np.hstack(db_vectors)

# Combine all feature types into a single vector for the query records
query_vectors = []
if numerical_features:
query_vectors.append(df_query[numerical_features].values)
if boolean_features:
query_vectors.append(
df_query[boolean_features].astype(int).values
) # Convert boolean to int
if hashed_categorical_data_query:
query_vectors.append(np.hstack(hashed_categorical_data_query))

# Concatenate query vectors
final_query_vectors = np.hstack(query_vectors)

return final_db_vectors, final_query_vectors


def replace_empty_strings_with_missing(records):
return [
[(field if field != "" else "missing") for field in record]
for record in records
]

0 comments on commit 40097c1

Please sign in to comment.