-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #39 from uktrade/feature/orpd-54-search-terms
chore:enhance search functionality with combined search terms
- Loading branch information
Showing
7 changed files
with
222 additions
and
85 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from orp_search.config import SearchDocumentConfig | ||
|
||
from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator | ||
|
||
|
||
def paginate(context, config: SearchDocumentConfig, search_results): | ||
paginator = Paginator(search_results, config.limit) | ||
try: | ||
paginated_documents = paginator.page(config.offset) | ||
except PageNotAnInteger: | ||
paginated_documents = paginator.page(1) | ||
except EmptyPage: | ||
paginated_documents = paginator.page(paginator.num_pages) | ||
|
||
# Iterate over each document in paginated_documents | ||
if paginated_documents: | ||
for paginated_document in paginated_documents: | ||
if "description" in paginated_document: | ||
description = paginated_document["description"] | ||
|
||
# If description is not an empty string | ||
if description: | ||
# Truncate description to 100 characters | ||
paginated_document["description"] = ( | ||
description[:100] + "..." | ||
if len(description) > 100 | ||
else description | ||
) | ||
if "regulatory_topics" in paginated_document: | ||
paginated_document["regulatory_topics"] = str( | ||
paginated_document["regulatory_topics"] | ||
).split("\n") | ||
|
||
context["paginator"] = paginator | ||
context["results"] = paginated_documents | ||
context["results_count"] = len(paginated_documents) | ||
context["is_paginated"] = paginator.num_pages > 1 | ||
context["results_total_count"] = paginator.count | ||
context["results_page_total"] = paginator.num_pages | ||
context["current_page"] = config.offset | ||
context["start_index"] = paginated_documents.start_index() | ||
context["end_index"] = paginated_documents.end_index() | ||
|
||
return context |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from datetime import datetime, timezone | ||
|
||
import dateutil.parser # type: ignore | ||
|
||
|
||
def parse_date(date_value): | ||
if isinstance(date_value, datetime): | ||
if date_value.tzinfo is None: | ||
# If the datetime is offset-naive, make it offset-aware in UTC | ||
return date_value.replace(tzinfo=timezone.utc) | ||
return date_value | ||
if isinstance(date_value, str): | ||
try: | ||
dt = dateutil.parser.parse(date_value) | ||
if dt.tzinfo is None: | ||
# If parsed datetime is offset-naive, | ||
# make it offset-aware in UTC | ||
return dt.replace(tzinfo=timezone.utc) | ||
return dt | ||
except ValueError: | ||
return None | ||
return None # Return None for invalid date types | ||
|
||
|
||
def calculate_score(search_result, search_terms): | ||
""" | ||
Calculate the score of a search result based on the number of | ||
search terms found in the title and description. | ||
:param search_result: A dictionary containing the search result. | ||
:param search_terms: A list of search terms to look for in the | ||
search result. | ||
:return: The score based on the number of search terms found. | ||
""" | ||
title = search_result.get("title", "") or "" | ||
description = search_result.get("description", "") or "" | ||
combined_content = title.lower() + " " + description.lower() | ||
score = sum(combined_content.count(term.lower()) for term in search_terms) | ||
return score |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import re | ||
|
||
|
||
def sanitize_input(search): | ||
""" | ||
Sanitize the input to remove potential threats like SQL injection | ||
characters. | ||
This function removes or escapes characters that are commonly used | ||
in SQL injection attacks. | ||
""" | ||
# Define a regular expression pattern to match unwanted characters or | ||
# patterns | ||
# This removes SQL keywords, single quotes, double quotes, semicolons, | ||
# and escape sequences | ||
sanitized_search = re.sub( | ||
r"(--|\b(SELECT|INSERT|DELETE|UPDATE|DROP|ALTER|EXEC|UNION" | ||
r"|CREATE)\b|'|\"|;)", | ||
"", | ||
search, | ||
flags=re.IGNORECASE, | ||
) | ||
return sanitized_search.strip() | ||
|
||
|
||
def parse_search_terms(search): | ||
# Sanitize input before processing | ||
search = sanitize_input(search) | ||
|
||
# Initialize lists to hold terms | ||
search_terms_and = [] | ||
search_terms_or = [] | ||
|
||
# Check if input only contains "AND", "OR", "+", or whitespace | ||
if re.fullmatch(r"(AND|OR|\+|\s)+", search): | ||
return search_terms_and, search_terms_or | ||
|
||
# Split the search string into tokens based on spaces and keywords | ||
tokens = re.split(r"(\s+|\bAND\b|\bOR\b|\+)", search) | ||
|
||
# Temporary variables for managing terms within quotes | ||
current_and_term = [] | ||
current_or_term = [] | ||
|
||
# Flag to determine if we are inside quotes | ||
in_quotes = False | ||
current_connector = None # Track AND/OR status outside of quotes | ||
|
||
for token in tokens: | ||
token = token.strip() | ||
|
||
if not token: | ||
continue | ||
|
||
# Check if token is the start/end of a quoted phrase | ||
if token.startswith('"') and token.endswith('"'): | ||
# Complete quoted term in one token | ||
quoted_term = token.strip('"') | ||
if current_connector == "AND" or current_connector is None: | ||
search_terms_and.append(quoted_term) | ||
elif current_connector == "OR": | ||
search_terms_or.append(quoted_term) | ||
continue | ||
elif token.startswith('"'): | ||
in_quotes = True | ||
current_and_term = [] | ||
current_or_term = [] | ||
current_and_term.append(token.strip('"')) | ||
continue | ||
elif token.endswith('"'): | ||
if in_quotes: | ||
if current_connector == "AND" or current_connector is None: | ||
current_and_term.append(token.strip('"')) | ||
search_terms_and.append(" ".join(current_and_term)) | ||
elif current_connector == "OR": | ||
current_or_term.append(token.strip('"')) | ||
search_terms_or.append(" ".join(current_or_term)) | ||
in_quotes = False | ||
continue | ||
|
||
# Handle token within quotes | ||
if in_quotes: | ||
if current_connector == "AND" or current_connector is None: | ||
current_and_term.append(token) | ||
elif current_connector == "OR": | ||
current_or_term.append(token) | ||
continue | ||
|
||
# Treat both + and AND as equivalent for "AND" logic | ||
if token.upper() == "AND" or token == "+": # nosec BXXX | ||
current_connector = "AND" | ||
elif token.upper() == "OR": | ||
current_connector = "OR" | ||
else: | ||
# Handle individual terms outside quotes | ||
if current_connector == "AND" or current_connector is None: | ||
search_terms_and.append(token) | ||
elif current_connector == "OR": | ||
search_terms_or.append(token) | ||
|
||
return search_terms_and, search_terms_or | ||
|
||
|
||
def combine_search_terms(search_terms_and, search_terms_or): | ||
# Join terms in `search_terms_and` with " AND " | ||
combined_and = " AND ".join(search_terms_and) if search_terms_and else "" | ||
|
||
# Join terms in `search_terms_or` with " OR " | ||
combined_or = " OR ".join(search_terms_or) if search_terms_or else "" | ||
|
||
# Combine both parts, adding parentheses around each if both are present | ||
if combined_and and combined_or: | ||
combined_query = f"{combined_and} OR {combined_or}" | ||
else: | ||
# Use whichever part is non-empty | ||
combined_query = combined_and or combined_or | ||
|
||
return combined_query |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters