Skip to content

Commit

Permalink
Merge branch 'main' of github.com:uktrade/orp into spike/add-react-fr…
Browse files Browse the repository at this point in the history
…ontend
  • Loading branch information
gdbarnes committed Oct 31, 2024
2 parents 71dfa20 + 5c6bc78 commit ee0569e
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 85 deletions.
10 changes: 10 additions & 0 deletions orp/orp_search/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

from orp_search.utils.terms import combine_search_terms, parse_search_terms

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -35,6 +37,14 @@ def __init__(
self.sort_by = sort_by
self.id = id

# Parse search terms
search_terms_and, search_terms_or = parse_search_terms(search_terms)
self.search_terms_and = search_terms_and
self.search_terms_or = search_terms_or
self.final_search_expression = combine_search_terms(
search_terms_and, search_terms_or
)

def validate(self):
"""
Expand Down
10 changes: 6 additions & 4 deletions orp/orp_search/legislation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ def __init__(self):
def search(self, config: SearchDocumentConfig):
logger.info("searching legislation...")

logger.info(
f"final_search_expression terms: {config.final_search_expression}"
)

# List of search terms
title_search_terms = config.search_terms
search_terms = ",".join(title_search_terms)
headers = {"Accept": "application/atom+xml"}
params = {
"lang": "en",
"title": search_terms,
"text": search_terms,
"title": config.final_search_expression,
"text": config.final_search_expression,
"results-count": 20,
}

Expand Down
Empty file.
44 changes: 44 additions & 0 deletions orp/orp_search/utils/paginate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from orp_search.config import SearchDocumentConfig

from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator


def paginate(context, config: SearchDocumentConfig, search_results):
paginator = Paginator(search_results, config.limit)
try:
paginated_documents = paginator.page(config.offset)
except PageNotAnInteger:
paginated_documents = paginator.page(1)
except EmptyPage:
paginated_documents = paginator.page(paginator.num_pages)

# Iterate over each document in paginated_documents
if paginated_documents:
for paginated_document in paginated_documents:
if "description" in paginated_document:
description = paginated_document["description"]

# If description is not an empty string
if description:
# Truncate description to 100 characters
paginated_document["description"] = (
description[:100] + "..."
if len(description) > 100
else description
)
if "regulatory_topics" in paginated_document:
paginated_document["regulatory_topics"] = str(
paginated_document["regulatory_topics"]
).split("\n")

context["paginator"] = paginator
context["results"] = paginated_documents
context["results_count"] = len(paginated_documents)
context["is_paginated"] = paginator.num_pages > 1
context["results_total_count"] = paginator.count
context["results_page_total"] = paginator.num_pages
context["current_page"] = config.offset
context["start_index"] = paginated_documents.start_index()
context["end_index"] = paginated_documents.end_index()

return context
39 changes: 39 additions & 0 deletions orp/orp_search/utils/results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from datetime import datetime, timezone

import dateutil.parser # type: ignore


def parse_date(date_value):
if isinstance(date_value, datetime):
if date_value.tzinfo is None:
# If the datetime is offset-naive, make it offset-aware in UTC
return date_value.replace(tzinfo=timezone.utc)
return date_value
if isinstance(date_value, str):
try:
dt = dateutil.parser.parse(date_value)
if dt.tzinfo is None:
# If parsed datetime is offset-naive,
# make it offset-aware in UTC
return dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return None
return None # Return None for invalid date types


def calculate_score(search_result, search_terms):
"""
Calculate the score of a search result based on the number of
search terms found in the title and description.
:param search_result: A dictionary containing the search result.
:param search_terms: A list of search terms to look for in the
search result.
:return: The score based on the number of search terms found.
"""
title = search_result.get("title", "") or ""
description = search_result.get("description", "") or ""
combined_content = title.lower() + " " + description.lower()
score = sum(combined_content.count(term.lower()) for term in search_terms)
return score
117 changes: 117 additions & 0 deletions orp/orp_search/utils/terms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import re


def sanitize_input(search):
"""
Sanitize the input to remove potential threats like SQL injection
characters.
This function removes or escapes characters that are commonly used
in SQL injection attacks.
"""
# Define a regular expression pattern to match unwanted characters or
# patterns
# This removes SQL keywords, single quotes, double quotes, semicolons,
# and escape sequences
sanitized_search = re.sub(
r"(--|\b(SELECT|INSERT|DELETE|UPDATE|DROP|ALTER|EXEC|UNION"
r"|CREATE)\b|'|\"|;)",
"",
search,
flags=re.IGNORECASE,
)
return sanitized_search.strip()


def parse_search_terms(search):
# Sanitize input before processing
search = sanitize_input(search)

# Initialize lists to hold terms
search_terms_and = []
search_terms_or = []

# Check if input only contains "AND", "OR", "+", or whitespace
if re.fullmatch(r"(AND|OR|\+|\s)+", search):
return search_terms_and, search_terms_or

# Split the search string into tokens based on spaces and keywords
tokens = re.split(r"(\s+|\bAND\b|\bOR\b|\+)", search)

# Temporary variables for managing terms within quotes
current_and_term = []
current_or_term = []

# Flag to determine if we are inside quotes
in_quotes = False
current_connector = None # Track AND/OR status outside of quotes

for token in tokens:
token = token.strip()

if not token:
continue

# Check if token is the start/end of a quoted phrase
if token.startswith('"') and token.endswith('"'):
# Complete quoted term in one token
quoted_term = token.strip('"')
if current_connector == "AND" or current_connector is None:
search_terms_and.append(quoted_term)
elif current_connector == "OR":
search_terms_or.append(quoted_term)
continue
elif token.startswith('"'):
in_quotes = True
current_and_term = []
current_or_term = []
current_and_term.append(token.strip('"'))
continue
elif token.endswith('"'):
if in_quotes:
if current_connector == "AND" or current_connector is None:
current_and_term.append(token.strip('"'))
search_terms_and.append(" ".join(current_and_term))
elif current_connector == "OR":
current_or_term.append(token.strip('"'))
search_terms_or.append(" ".join(current_or_term))
in_quotes = False
continue

# Handle token within quotes
if in_quotes:
if current_connector == "AND" or current_connector is None:
current_and_term.append(token)
elif current_connector == "OR":
current_or_term.append(token)
continue

# Treat both + and AND as equivalent for "AND" logic
if token.upper() == "AND" or token == "+": # nosec BXXX
current_connector = "AND"
elif token.upper() == "OR":
current_connector = "OR"
else:
# Handle individual terms outside quotes
if current_connector == "AND" or current_connector is None:
search_terms_and.append(token)
elif current_connector == "OR":
search_terms_or.append(token)

return search_terms_and, search_terms_or


def combine_search_terms(search_terms_and, search_terms_or):
# Join terms in `search_terms_and` with " AND "
combined_and = " AND ".join(search_terms_and) if search_terms_and else ""

# Join terms in `search_terms_or` with " OR "
combined_or = " OR ".join(search_terms_or) if search_terms_or else ""

# Combine both parts, adding parentheses around each if both are present
if combined_and and combined_or:
combined_query = f"{combined_and} OR {combined_or}"
else:
# Use whichever part is non-empty
combined_query = combined_and or combined_or

return combined_query
87 changes: 6 additions & 81 deletions orp/orp_search/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
import csv
import logging

from datetime import datetime, timezone

import dateutil.parser # type: ignore
import pandas as pd

from orp_search.legislation import Legislation
from orp_search.public_gateway import PublicGateway, SearchDocumentConfig
from orp_search.utils.paginate import paginate
from orp_search.utils.results import calculate_score, parse_date

from django.conf import settings
from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
from django.http import HttpRequest, HttpResponse
from django.shortcuts import redirect, render
from django.views.decorators.http import require_http_methods
Expand Down Expand Up @@ -161,42 +159,6 @@ def download_search_csv(request: HttpRequest) -> HttpResponse:
return response


def _parse_date(date_value):
if isinstance(date_value, datetime):
if date_value.tzinfo is None:
# If the datetime is offset-naive, make it offset-aware in UTC
return date_value.replace(tzinfo=timezone.utc)
return date_value
if isinstance(date_value, str):
try:
dt = dateutil.parser.parse(date_value)
if dt.tzinfo is None:
# If parsed datetime is offset-naive,
# make it offset-aware in UTC
return dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return None
return None # Return None for invalid date types


def _calculate_score(search_result, search_terms):
"""
Calculate the score of a search result based on the number of
search terms found in the title and description.
:param search_result: A dictionary containing the search result.
:param search_terms: A list of search terms to look for in the
search result.
:return: The score based on the number of search terms found.
"""
title = search_result.get("title", "") or ""
description = search_result.get("description", "") or ""
combined_content = title.lower() + " " + description.lower()
score = sum(combined_content.count(term.lower()) for term in search_terms)
return score


@require_http_methods(["GET"])
def search(request: HttpRequest) -> HttpResponse:
"""Search view.
Expand Down Expand Up @@ -253,7 +215,7 @@ def search(request: HttpRequest) -> HttpResponse:

# Get the search results from the Data API using PublicGateway class
config = SearchDocumentConfig(
str(search_query).lower(),
search_query,
document_types,
dummy=True,
limit=limit,
Expand Down Expand Up @@ -287,57 +249,20 @@ def search(request: HttpRequest) -> HttpResponse:
if sort_by == "recent":
search_results = sorted(
search_results,
key=lambda x: _parse_date(x["date_modified"]),
key=lambda x: parse_date(x["date_modified"]),
reverse=True,
)
elif sort_by == "relevance":
# Add the 'score' to each search result
for result in search_results:
logger.info("result to pass to calculate score: %s", result)
result["score"] = _calculate_score(result, config.search_terms)
result["score"] = calculate_score(result, config.search_terms)

search_results = sorted(
search_results,
key=lambda x: x["score"],
reverse=True,
)

# Paginate results
paginator = Paginator(search_results, config.limit)
try:
paginated_documents = paginator.page(config.offset)
except PageNotAnInteger:
paginated_documents = paginator.page(1)
except EmptyPage:
paginated_documents = paginator.page(paginator.num_pages)

# Iterate over each document in paginated_documents
if paginated_documents:
for paginated_document in paginated_documents:
if "description" in paginated_document:
description = paginated_document["description"]

# If description is not an empty string
if description:
# Truncate description to 100 characters
paginated_document["description"] = (
description[:100] + "..."
if len(description) > 100
else description
)
if "regulatory_topics" in paginated_document:
paginated_document["regulatory_topics"] = str(
paginated_document["regulatory_topics"]
).split("\n")

context["paginator"] = paginator
context["results"] = paginated_documents
context["results_count"] = len(paginated_documents)
context["is_paginated"] = paginator.num_pages > 1
context["results_total_count"] = paginator.count
context["results_page_total"] = paginator.num_pages
context["current_page"] = config.offset
context["start_index"] = paginated_documents.start_index()
context["end_index"] = paginated_documents.end_index()

context = paginate(context, config, search_results)
return render(request, template_name="orp.html", context=context)

0 comments on commit ee0569e

Please sign in to comment.