Skip to content

Commit

Permalink
chore:remove redundant pagination and outdated XML parsing
Browse files Browse the repository at this point in the history
This commit primarily removes unused pagination imports and legacy XML parsing functions. It also optimizes search result handling by combining legislative results directly, and enhances pagination in the search view for better performance and maintainability.
  • Loading branch information
hareshkainthdbt committed Oct 28, 2024
1 parent 34ce4fe commit 8b153fc
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 282 deletions.
2 changes: 1 addition & 1 deletion orp/core/templates/accessibility_statement.html
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,4 @@ <h2 class="govuk-heading-m">Preparation of this statement</h2>
</div>
</main>
</div>
{% endblock %}
{% endblock %}
2 changes: 1 addition & 1 deletion orp/core/templates/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ <h1 class="govuk-heading-xl">Open Regulation Platform</h1>
</svg>
</a>
</div>
{% endblock %}
{% endblock %}
20 changes: 20 additions & 0 deletions orp/orp_search/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,23 @@ def validate(self):
logger.error("sort_by must be 'recent' or 'relevance'")
return False
return True

def build_search_term(self):
# Rules config.search_terms
# 1. If search terms is empty, return empty string
# 2. If search terms begin with a quote and end with a quote
# then treat as a phrase
# 3. If search terms contain a + between two terms then treat
# as an AND search
# 4. If search terms contain a space between two terms then treat
# as a OR search

search_term_tmp = []

for term in self.search_terms:
if term.startswith('"') and term.endswith('"'):
search_term_tmp.append(f'"{term}"')
elif "+" in term:
search_term_tmp.append(term.replace("+", " AND "))
else:
search_term_tmp.append(term)
268 changes: 120 additions & 148 deletions orp/orp_search/legislation.py
Original file line number Diff line number Diff line change
@@ -1,176 +1,148 @@
import base64
import logging
import xml.etree.ElementTree as ET # nosec BXXX

from io import StringIO

import pandas as pd
import requests # type: ignore

from orp_search.config import SearchDocumentConfig

from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator

logger = logging.getLogger(__name__)


def _extract_td_value(html_content, target_text):
# Step 1: Locate the <h2> element and the subsequent <table> element
h2_text = '<h2 class="title">Count Results</h2>'
table_class = '<table class="results results-single query-builder"'
start_index = html_content.find(h2_text)
if start_index == -1:
raise ValueError("specified <h2> text not found in the HTML content")

start_index = html_content.find(table_class, start_index)
if start_index == -1:
raise ValueError(
"specified <table> class not found in the HTML content"
)

# Step 2: Locate the <th> tag with the target text
th_start = html_content.find(f"<th>{target_text}</th>", start_index)
if th_start == -1:
raise ValueError(
f"<th>{target_text}</th> not found in the HTML content"
)

# Step 3: Find the <td> tag immediately following the located <th> tag
td_start = html_content.find("<td>", th_start)
if td_start == -1:
raise ValueError("no <td> tag found after the specified <th> tag")

td_end = html_content.find("</td>", td_start)
if td_end == -1:
raise ValueError(
"No closing </td> tag found after the specified <th> tag"
)

# Step 4: Extract and return the content within the <td> tag
td_value = html_content[ # noqa: E203
td_start + len("<td>") : td_end # noqa: E203
].strip() # noqa: E203
return td_value


def _perform_request(url, params, timeout=10):
logger.info(f"url for request: {url}")
logger.info(f"params for request: {params}")
response = requests.get(url, params=params, timeout=timeout)
return response.text if response.status_code == 200 else None


def _encode_url(url):
encoded_bytes = base64.urlsafe_b64encode(url.encode("utf-8"))
return encoded_bytes.decode("utf-8")


class Legislation:
def __init__(self):
self.search_url = (
"https://research.legislation.gov.uk/query-builder/search/data.csv"
)
self.count_url = (
"https://research.legislation.gov.uk/query-builder/count"
)
self.search_url = "https://www.legislation.gov.uk/search"

def search(self, config: SearchDocumentConfig):
logger.info("searching legislation...")

# List of search terms
title_search_terms = config.search_terms
search_terms = ",".join(title_search_terms)
headers = {"Accept": "application/atom+xml"}
params = {
"amendments": "include",
"query": search_terms,
"count": "100",
"lang": "en",
"title": search_terms,
"text": search_terms,
"results-count": 100,
}

# Get search results
data_csv = _perform_request(self.search_url, params, config.timeout)

# Convert the response (string) to a file-like object
data_io = StringIO(data_csv)

# Read the CSV string into a DataFrame
df = pd.read_csv(data_io)

results = []
# Convert data_csv into data api format and to list
for index, item in df.iterrows():
results.append(
{
"id": _encode_url(item["id"]),
"title": item["title"],
"document_type": "legislation",
"publisher_id": item["type"],
"publisher": "UK Legislation",
"type": "Legislation",
"date_modified": item["valid"],
}
# Register namespaces
ET.register_namespace("", "http://www.w3.org/2005/Atom")
ET.register_namespace(
"leg", "http://www.legislation.gov.uk/namespaces/legislation"
)
ET.register_namespace(
"openSearch", "http://a9.com/-/spec/opensearch/1.1/"
)

# Namespace dictionary
ns = {
"": "http://www.w3.org/2005/Atom",
"leg": "http://www.legislation.gov.uk/namespaces/legislation",
"ukm": "http://www.legislation.gov.uk/namespaces/metadata",
"theme": "http://www.legislation.gov.uk/namespaces/theme",
"openSearch": "http://a9.com/-/spec/opensearch/1.1/",
}

def _do_request():
# Get search results and parse XML data (root)
response = requests.get(
self.search_url,
params=params,
headers=headers,
timeout=config.timeout,
)
if response.status_code == 200:
root = ET.fromstring(
response.content.decode("utf-8")
) # nosec BXXX
else:
root = None

logger.info(f"legislation total results: {len(results)}")
return results

def finalise_results(
self, config: SearchDocumentConfig, results, context
) -> dict:
# title_search_terms = config.search_terms
# search_terms = ",".join(title_search_terms)
# params = {
# "amendments": "include",
# "query": search_terms,
# # 'counting': 'documents',
# }

# # Get count of total results
# count_data_html_page = _perform_request(
# self.count_url, params, config.timeout
# )
# total_document_count = _extract_td_value(
# count_data_html_page, "documents"
# )

paginated_documents = []
exists = False

# Check if paginator exists in context
if "paginator" not in context:
logger.info("paginator not in context for legislation")
context["paginator"] = {}
paginator = Paginator(results, config.limit)
try:
paginated_documents = paginator.page(config.offset)
except PageNotAnInteger:
paginated_documents = paginator.page(1)
except EmptyPage:
paginated_documents = paginator.page(paginator.num_pages)
else:
logger.info("paginator exists in context for legislation")
exists = True
paginator = context["paginator"]

# If paginator exists then add all results to paginator
if exists:
all_items = paginator.object_list

# Convert to a list if necessary
all_non_legislation_items = list(all_items)

# Combine with legislation results
all_items = all_non_legislation_items + results

paginator = Paginator(all_items, config.limit)
try:
paginated_documents = paginator.page(config.offset)
except PageNotAnInteger:
paginated_documents = paginator.page(1)
except EmptyPage:
paginated_documents = paginator.page(paginator.num_pages)

context["current_page"] = config.offset
context["paginator"] = paginator
context["is_paginated"] = paginator.num_pages > 1
context["results_total_count"] = paginator.count
context["results"] = paginated_documents
# context["start_index"] = paginated_documents.start_index()
# context["end_index"] = paginated_documents.end_index()
return context
# Extract pagination values
page_data = {
"page": (
root.find(".//leg:page", ns).text
if root.find(".//leg:page", ns) is not None
else None
),
"morePages": (
root.find(".//leg:morePages", ns).text
if root.find(".//leg:morePages", ns) is not None
else None
),
}

logger.info(f"legislation page data: {page_data}")
return root, page_data
except Exception as e:
logger.error(f"error fetching legislation: {e}")
return None, None

root, page_data = _do_request()

if not root:
return []

all_entries = []

def _extract_entries(root):
# Extract entries
entries = []
for entry in root.findall("entry", ns):
entry_id = (
entry.find("id", ns).text
if entry.find("id", ns) is not None
else None
)
title = (
entry.find("title", ns).text
if entry.find("title", ns) is not None
else None
)
updated = (
entry.find("updated", ns).text
if entry.find("updated", ns) is not None
else None
)
published = (
entry.find("published", ns).text
if entry.find("published", ns) is not None
else "N/A"
) # Placeholder if missing
summary = (
entry.find("summary", ns).text
if entry.find("summary", ns) is not None
else "N/A"
) # Placeholder if missing
entries.append(
{
"id": _encode_url(entry_id),
"title": title,
"date_modified": updated if updated else published,
"publisher": "Legislation",
"description": summary,
"type": "Legislation",
}
)
return entries

all_entries += _extract_entries(root)

morePages = int(page_data["morePages"])
logger.info(f"legislation more pages: {morePages}")
if morePages > 1:
# Get remaining pages
for page in range(2, morePages + 1):
root, _ = _do_request()
all_entries.append(_extract_entries(root))

logger.info(f"legislation total results: {len(all_entries)}")
return all_entries
Loading

0 comments on commit 8b153fc

Please sign in to comment.